diff --git a/.github/workflows/bot.yml b/.github/workflows/bot.yml
index ebf3caccd9c62..b1ec4456adcca 100644
--- a/.github/workflows/bot.yml
+++ b/.github/workflows/bot.yml
@@ -14,20 +14,39 @@ jobs:
build:
runs-on: ubuntu-latest
strategy:
+ max-parallel: 7
matrix:
include:
- - scala: "scala-2.11"
- spark: "spark2"
- - scala: "scala-2.11"
- spark: "spark2,spark-shade-unbundle-avro"
- - scala: "scala-2.12"
- spark: "spark3.1.x"
- - scala: "scala-2.12"
- spark: "spark3.1.x,spark-shade-unbundle-avro"
- - scala: "scala-2.12"
- spark: "spark3"
- - scala: "scala-2.12"
- spark: "spark3,spark-shade-unbundle-avro"
+ # Spark 2.4.4
+ - scalaProfile: "scala-2.11"
+ sparkProfile: "spark2"
+ sparkVersion: "2.4.4"
+
+ # Spark 3.1.x
+ - scalaProfile: "scala-2.12"
+ sparkProfile: "spark3.1.x"
+ sparkVersion: "3.1.0"
+
+ - scalaProfile: "scala-2.12"
+ sparkProfile: "spark3.1.x"
+ sparkVersion: "3.1.1"
+
+ - scalaProfile: "scala-2.12"
+ sparkProfile: "spark3.1.x"
+ sparkVersion: "3.1.2"
+
+ - scalaProfile: "scala-2.12"
+ sparkProfile: "spark3.1.x"
+ sparkVersion: "3.1.3"
+
+ # Spark 3.2.x
+ - scalaProfile: "scala-2.12"
+ sparkProfile: "spark3.2.0"
+ sparkVersion: "3.2.0"
+
+ - scalaProfile: "scala-2.12"
+ sparkProfile: "spark3"
+ sparkVersion: "3.2.1"
steps:
- uses: actions/checkout@v2
- name: Set up JDK 8
@@ -38,6 +57,16 @@ jobs:
architecture: x64
- name: Build Project
env:
- SCALA_PROFILE: ${{ matrix.scala }}
- SPARK_PROFILE: ${{ matrix.spark }}
- run: mvn install -P "$SCALA_PROFILE,$SPARK_PROFILE" -DskipTests=true -Dmaven.javadoc.skip=true -B -V
+ SCALA_PROFILE: ${{ matrix.scalaProfile }}
+ SPARK_PROFILE: ${{ matrix.sparkProfile }}
+ SPARK_VERSION: ${{ matrix.sparkVersion }}
+ run:
+ mvn -T 2.5C clean install -P "$SCALA_PROFILE,$SPARK_PROFILE" -Dspark.version="$SPARK_VERSION" -DskipTests=true -Dmaven.javadoc.skip=true -B -V
+ - name: Quickstart Test
+ env:
+ SCALA_PROFILE: ${{ matrix.scalaProfile }}
+ SPARK_PROFILE: ${{ matrix.sparkProfile }}
+ SPARK_VERSION: ${{ matrix.sparkVersion }}
+ if: ${{ !startsWith(env.SPARK_VERSION, '3.2.') }} # skip test spark 3.2 before hadoop upgrade to 3.x
+ run:
+ mvn test -Punit-tests -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -Dspark.version="$SPARK_VERSION" -DfailIfNoTests=false -pl hudi-examples/hudi-examples-flink,hudi-examples/hudi-examples-java,hudi-examples/hudi-examples-spark
diff --git a/azure-pipelines.yml b/azure-pipelines.yml
index 5f21b29bb6adf..f1b25db20473f 100644
--- a/azure-pipelines.yml
+++ b/azure-pipelines.yml
@@ -26,6 +26,7 @@ variables:
SPARK_VERSION: '2.4.4'
HADOOP_VERSION: '2.7'
SPARK_ARCHIVE: spark-$(SPARK_VERSION)-bin-hadoop$(HADOOP_VERSION)
+ EXCLUDE_TESTED_MODULES: '!hudi-examples/hudi-examples-common,!hudi-examples/hudi-examples-flink,!hudi-examples/hudi-examples-java,!hudi-examples/hudi-examples-spark,!hudi-common,!hudi-flink-datasource/hudi-flink,!hudi-client/hudi-spark-client,!hudi-client/hudi-client-common,!hudi-client/hudi-flink-client,!hudi-client/hudi-java-client,!hudi-cli,!hudi-utilities,!hudi-sync/hudi-hive-sync'
stages:
- stage: test
@@ -132,7 +133,7 @@ stages:
inputs:
mavenPomFile: 'pom.xml'
goals: 'test'
- options: -Punit-tests -pl !hudi-common,!hudi-flink-datasource/hudi-flink,!hudi-client/hudi-spark-client,!hudi-client/hudi-client-common,!hudi-client/hudi-flink-client,!hudi-client/hudi-java-client,!hudi-cli,!hudi-utilities,!hudi-sync/hudi-hive-sync
+ options: -Punit-tests -pl $(EXCLUDE_TESTED_MODULES)
publishJUnitResults: false
jdkVersionOption: '1.8'
mavenOptions: '-Xmx2g $(MAVEN_OPTS)'
@@ -141,7 +142,7 @@ stages:
inputs:
mavenPomFile: 'pom.xml'
goals: 'test'
- options: -Pfunctional-tests -pl !hudi-common,!hudi-flink-datasource/hudi-flink,!hudi-client/hudi-spark-client,!hudi-client/hudi-client-common,!hudi-client/hudi-flink-client,!hudi-client/hudi-java-client,!hudi-cli,!hudi-utilities,!hudi-sync/hudi-hive-sync
+ options: -Pfunctional-tests -pl $(EXCLUDE_TESTED_MODULES)
publishJUnitResults: false
jdkVersionOption: '1.8'
mavenOptions: '-Xmx2g $(MAVEN_OPTS)'
diff --git a/hudi-examples/hudi-examples-common/pom.xml b/hudi-examples/hudi-examples-common/pom.xml
new file mode 100644
index 0000000000000..9c327672226bd
--- /dev/null
+++ b/hudi-examples/hudi-examples-common/pom.xml
@@ -0,0 +1,109 @@
+
+
+
+
+ hudi-examples
+ org.apache.hudi
+ 0.11.0-SNAPSHOT
+
+ 4.0.0
+
+ hudi-examples-common
+
+
+ ${project.parent.basedir}
+ true
+
+
+
+
+
+ src/main/resources
+
+
+
+
+
+ net.alchim31.maven
+ scala-maven-plugin
+
+
+ scala-compile-first
+ process-resources
+
+ add-source
+ compile
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+
+
+ compile
+
+ compile
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+
+
+
+ test-jar
+
+ test-compile
+
+
+
+ false
+
+
+
+ org.apache.rat
+ apache-rat-plugin
+
+
+
+
+
+
+ org.apache.hudi
+ hudi-common
+ ${project.version}
+
+
+
+
+ org.apache.avro
+ avro
+
+
+
+ org.apache.parquet
+ parquet-avro
+
+
+
diff --git a/hudi-examples/src/main/java/org/apache/hudi/examples/common/HoodieExampleDataGenerator.java b/hudi-examples/hudi-examples-common/src/main/java/org/apache/hudi/examples/common/HoodieExampleDataGenerator.java
similarity index 99%
rename from hudi-examples/src/main/java/org/apache/hudi/examples/common/HoodieExampleDataGenerator.java
rename to hudi-examples/hudi-examples-common/src/main/java/org/apache/hudi/examples/common/HoodieExampleDataGenerator.java
index 78df2e78e7081..2c94c8bc43d44 100644
--- a/hudi-examples/src/main/java/org/apache/hudi/examples/common/HoodieExampleDataGenerator.java
+++ b/hudi-examples/hudi-examples-common/src/main/java/org/apache/hudi/examples/common/HoodieExampleDataGenerator.java
@@ -43,7 +43,6 @@
import java.util.stream.IntStream;
import java.util.stream.Stream;
-
/**
* Class to be used to generate test data.
*/
@@ -63,7 +62,7 @@ public class HoodieExampleDataGenerator> {
+ "{\"name\":\"fare\",\"type\": \"double\"}]}";
public static Schema avroSchema = new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA);
- private static Random rand = new Random(46474747);
+ private static final Random rand = new Random(46474747);
private final Map existingKeys;
private final String[] partitionPaths;
diff --git a/hudi-examples/hudi-examples-flink/pom.xml b/hudi-examples/hudi-examples-flink/pom.xml
new file mode 100644
index 0000000000000..7aa6c05d5e382
--- /dev/null
+++ b/hudi-examples/hudi-examples-flink/pom.xml
@@ -0,0 +1,364 @@
+
+
+
+
+ hudi-examples
+ org.apache.hudi
+ 0.11.0-SNAPSHOT
+
+ 4.0.0
+
+ hudi-examples-flink
+
+
+ ${project.parent.basedir}
+ true
+ 1.11.1
+
+
+
+
+
+ org.jacoco
+ jacoco-maven-plugin
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+
+ 1.8
+ 1.8
+
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+ 3.1.2
+
+
+
+ test-jar
+
+
+
+
+
+ org.apache.rat
+ apache-rat-plugin
+
+
+
+
+
+ src/main/resources
+
+
+ src/test/resources
+
+
+
+
+
+
+
+ org.apache.hudi
+ hudi-common
+ ${project.version}
+
+
+ org.apache.hudi
+ hudi-client-common
+ ${project.version}
+
+
+ org.apache.hudi
+ hudi-flink-client
+ ${project.version}
+
+
+ org.apache.hudi
+ hudi-hadoop-mr
+ ${project.version}
+
+
+ org.apache.hudi
+ hudi-hive-sync
+ ${project.version}
+
+
+ org.apache.hudi
+ hudi-sync-common
+ ${project.version}
+
+
+
+ org.apache.hudi
+ hudi-flink
+ ${project.version}
+ compile
+
+
+
+
+ org.apache.flink
+ flink-streaming-java_${scala.binary.version}
+ compile
+
+
+ org.apache.flink
+ flink-clients_${scala.binary.version}
+ compile
+
+
+ com.esotericsoftware.kryo
+ kryo
+
+
+ com.esotericsoftware.minlog
+ minlog
+
+
+
+
+ org.apache.flink
+ flink-connector-kafka_${scala.binary.version}
+ compile
+
+
+ org.apache.kafka
+ kafka-clients
+ ${kafka.version}
+
+
+ org.apache.flink
+ flink-hadoop-compatibility_${scala.binary.version}
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-parquet_${scala.binary.version}
+ ${flink.version}
+ provided
+
+
+ org.apache.flink
+ flink-json
+ ${flink.version}
+ provided
+
+
+ org.apache.flink
+ flink-table-common
+ ${flink.version}
+ provided
+
+
+ org.apache.flink
+ flink-table-runtime_${scala.binary.version}
+ ${flink.version}
+ provided
+
+
+ org.apache.flink
+ flink-table-planner_${scala.binary.version}
+ ${flink.version}
+ provided
+
+
+ org.apache.flink
+ flink-statebackend-rocksdb_${scala.binary.version}
+ ${flink.version}
+ provided
+
+
+
+ org.apache.parquet
+ parquet-hadoop
+ ${parquet.version}
+
+
+ org.xerial.snappy
+ snappy-java
+
+
+
+
+
+
+ org.apache.avro
+ avro
+
+ 1.10.0
+ compile
+
+
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-core
+ compile
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+
+
+
+ com.beust
+ jcommander
+ compile
+
+
+ com.twitter
+ bijection-avro_${scala.binary.version}
+ 0.9.7
+
+
+ joda-time
+ joda-time
+ 2.5
+
+
+
+ ${hive.groupid}
+ hive-exec
+ ${hive.version}
+ ${hive.exec.classifier}
+
+
+ javax.mail
+ mail
+
+
+ org.eclipse.jetty.aggregate
+ *
+
+
+
+
+
+
+
+
+ org.junit.jupiter
+ junit-jupiter-api
+ test
+
+
+ org.junit.jupiter
+ junit-jupiter-engine
+ test
+
+
+ org.junit.vintage
+ junit-vintage-engine
+ test
+
+
+ org.junit.jupiter
+ junit-jupiter-params
+ test
+
+
+
+
+ org.apache.hudi
+ hudi-common
+ ${project.version}
+ tests
+ test-jar
+ test
+
+
+ org.apache.hudi
+ hudi-client-common
+ ${project.version}
+ tests
+ test-jar
+ test
+
+
+ org.apache.hudi
+ hudi-flink-client
+ ${project.version}
+ tests
+ test-jar
+ test
+
+
+
+
+
+
+
+
+
+
+ org.apache.flink
+ flink-test-utils_${scala.binary.version}
+ ${flink.version}
+ test
+
+
+ org.apache.flink
+ flink-runtime
+ ${flink.version}
+ test
+ test-jar
+
+
+ org.apache.flink
+ flink-streaming-java_${scala.binary.version}
+ ${flink.version}
+ test
+ test-jar
+
+
+ org.apache.flink
+ flink-table-runtime_${scala.binary.version}
+ ${flink.version}
+ test
+ test-jar
+
+
+ org.apache.flink
+ flink-json
+ ${flink.version}
+ test
+ test-jar
+
+
+ org.apache.flink
+ flink-csv
+ ${flink.version}
+ test
+
+
+
+
+ org.apache.parquet
+ parquet-avro
+ ${parquet.version}
+ test
+
+
+
diff --git a/hudi-examples/hudi-examples-flink/src/main/java/org/apache/hudi/examples/quickstart/HoodieFlinkQuickstart.java b/hudi-examples/hudi-examples-flink/src/main/java/org/apache/hudi/examples/quickstart/HoodieFlinkQuickstart.java
new file mode 100644
index 0000000000000..b3e105015a58c
--- /dev/null
+++ b/hudi-examples/hudi-examples-flink/src/main/java/org/apache/hudi/examples/quickstart/HoodieFlinkQuickstart.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.examples.quickstart;
+
+import static org.apache.hudi.examples.quickstart.utils.QuickstartConfigurations.sql;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.types.Row;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.examples.quickstart.factory.CollectSinkTableFactory;
+import org.apache.hudi.examples.quickstart.utils.QuickstartConfigurations;
+import org.jetbrains.annotations.NotNull;
+
+public final class HoodieFlinkQuickstart {
+ private EnvironmentSettings settings = null;
+ private TableEnvironment streamTableEnv = null;
+
+ private String tableName;
+
+ private HoodieFlinkQuickstart() {
+ }
+
+ public static HoodieFlinkQuickstart instance() {
+ return new HoodieFlinkQuickstart();
+ }
+
+ public static void main(String[] args) throws TableNotExistException, InterruptedException {
+ if (args.length < 3) {
+ System.err.println("Usage: HoodieWriteClientExample ");
+ System.exit(1);
+ }
+ String tablePath = args[0];
+ String tableName = args[1];
+ String tableType = args[2];
+
+ HoodieFlinkQuickstart flinkQuickstart = instance();
+ flinkQuickstart.initEnv();
+
+ // create filesystem table named source
+ flinkQuickstart.createFileSource();
+
+ // create hudi table
+ flinkQuickstart.createHudiTable(tablePath, tableName, HoodieTableType.valueOf(tableType));
+
+ // insert data
+ flinkQuickstart.insertData();
+
+ // query data
+ flinkQuickstart.queryData();
+
+ // update data
+ flinkQuickstart.updateData();
+ }
+
+ public void initEnv() {
+ if (this.streamTableEnv == null) {
+ settings = EnvironmentSettings.newInstance().build();
+ TableEnvironment streamTableEnv = TableEnvironmentImpl.create(settings);
+ streamTableEnv.getConfig().getConfiguration()
+ .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+ Configuration execConf = streamTableEnv.getConfig().getConfiguration();
+ execConf.setString("execution.checkpointing.interval", "2s");
+ // configure not to retry after failure
+ execConf.setString("restart-strategy", "fixed-delay");
+ execConf.setString("restart-strategy.fixed-delay.attempts", "0");
+ this.streamTableEnv = streamTableEnv;
+ }
+ }
+
+ public TableEnvironment getStreamTableEnv() {
+ return streamTableEnv;
+ }
+
+ public TableEnvironment getBatchTableEnv() {
+ Configuration conf = new Configuration();
+ // for batch upsert use cases: current suggestion is to disable these 2 options,
+ // from 1.14, flink runtime execution mode has switched from streaming
+ // to batch for batch execution mode(before that, both streaming and batch use streaming execution mode),
+ // current batch execution mode has these limitations:
+ //
+ // 1. the keyed stream default to always sort the inputs by key;
+ // 2. the batch state-backend requires the inputs sort by state key
+ //
+ // For our hudi batch pipeline upsert case, we rely on the consuming sequence for index records and data records,
+ // the index records must be loaded first before data records for BucketAssignFunction to keep upsert semantics correct,
+ // so we suggest disabling these 2 options to use streaming state-backend for batch execution mode
+ // to keep the strategy before 1.14.
+ conf.setBoolean("execution.sorted-inputs.enabled", false);
+ conf.setBoolean("execution.batch-state-backend.enabled", false);
+ StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(conf);
+ settings = EnvironmentSettings.newInstance().inBatchMode().build();
+ TableEnvironment batchTableEnv = StreamTableEnvironment.create(execEnv, settings);
+ batchTableEnv.getConfig().getConfiguration()
+ .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+ return batchTableEnv;
+ }
+
+ public void createHudiTable(String tablePath, String tableName,
+ HoodieTableType tableType) {
+ this.tableName = tableName;
+
+ // create hudi table
+ String hoodieTableDDL = sql(tableName)
+ .option(FlinkOptions.PATH, tablePath)
+ .option(FlinkOptions.READ_AS_STREAMING, true)
+ .option(FlinkOptions.TABLE_TYPE, tableType)
+ .end();
+ streamTableEnv.executeSql(hoodieTableDDL);
+ }
+
+ public void createFileSource() {
+ // create filesystem table named source
+ String createSource = QuickstartConfigurations.getFileSourceDDL("source");
+ streamTableEnv.executeSql(createSource);
+ }
+
+ @NotNull List insertData() throws InterruptedException, TableNotExistException {
+ // insert data
+ String insertInto = String.format("insert into %s select * from source", tableName);
+ execInsertSql(streamTableEnv, insertInto);
+ return queryData();
+ }
+
+ List queryData() throws InterruptedException, TableNotExistException {
+ // query data
+ // reading from the latest commit instance.
+ return execSelectSql(streamTableEnv, String.format("select * from %s", tableName), 10);
+ }
+
+ @NotNull List updateData() throws InterruptedException, TableNotExistException {
+ // update data
+ String insertInto = String.format("insert into %s select * from source", tableName);
+ execInsertSql(getStreamTableEnv(), insertInto);
+ return queryData();
+ }
+
+ public static void execInsertSql(TableEnvironment tEnv, String insert) {
+ TableResult tableResult = tEnv.executeSql(insert);
+ // wait to finish
+ try {
+ tableResult.getJobClient().get().getJobExecutionResult().get();
+ } catch (InterruptedException | ExecutionException ex) {
+ // ignored
+ }
+ }
+
+ public static List execSelectSql(TableEnvironment tEnv, String select, long timeout)
+ throws InterruptedException, TableNotExistException {
+ return execSelectSql(tEnv, select, timeout, null);
+ }
+
+ public static List execSelectSql(TableEnvironment tEnv, String select, long timeout, String sourceTable)
+ throws InterruptedException, TableNotExistException {
+ final String sinkDDL;
+ if (sourceTable != null) {
+ // use the source table schema as the sink schema if the source table was specified, .
+ ObjectPath objectPath = new ObjectPath(tEnv.getCurrentDatabase(), sourceTable);
+ TableSchema schema = tEnv.getCatalog(tEnv.getCurrentCatalog()).get().getTable(objectPath).getSchema();
+ sinkDDL = QuickstartConfigurations.getCollectSinkDDL("sink", schema);
+ } else {
+ sinkDDL = QuickstartConfigurations.getCollectSinkDDL("sink");
+ }
+ return execSelectSql(tEnv, select, sinkDDL, timeout);
+ }
+
+ public static List execSelectSql(TableEnvironment tEnv, String select, String sinkDDL, long timeout)
+ throws InterruptedException {
+ tEnv.executeSql("DROP TABLE IF EXISTS sink");
+ tEnv.executeSql(sinkDDL);
+ TableResult tableResult = tEnv.executeSql("insert into sink " + select);
+ // wait for the timeout then cancels the job
+ TimeUnit.SECONDS.sleep(timeout);
+ tableResult.getJobClient().ifPresent(JobClient::cancel);
+ tEnv.executeSql("DROP TABLE IF EXISTS sink");
+ return CollectSinkTableFactory.RESULT.values().stream()
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList());
+ }
+}
diff --git a/hudi-examples/hudi-examples-flink/src/main/java/org/apache/hudi/examples/quickstart/factory/CollectSinkTableFactory.java b/hudi-examples/hudi-examples-flink/src/main/java/org/apache/hudi/examples/quickstart/factory/CollectSinkTableFactory.java
new file mode 100644
index 0000000000000..5687a7c146720
--- /dev/null
+++ b/hudi-examples/hudi-examples-flink/src/main/java/org/apache/hudi/examples/quickstart/factory/CollectSinkTableFactory.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.examples.quickstart.factory;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.utils.TypeConversions;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Factory for CollectTableSink.
+ *
+ *
Note: The CollectTableSink collects all the data of a table into a global collection {@code RESULT},
+ * so the tests should executed in single thread and the table name should be the same.
+ */
+public class CollectSinkTableFactory implements DynamicTableSinkFactory {
+ public static final String FACTORY_ID = "collect";
+
+ // global results to collect and query
+ public static final Map> RESULT = new HashMap<>();
+
+ @Override
+ public DynamicTableSink createDynamicTableSink(Context context) {
+ FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+ helper.validate();
+
+ TableSchema schema = context.getCatalogTable().getSchema();
+ RESULT.clear();
+ return new CollectTableSink(schema, context.getObjectIdentifier().getObjectName());
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ return FACTORY_ID;
+ }
+
+ @Override
+ public Set> requiredOptions() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public Set> optionalOptions() {
+ return Collections.emptySet();
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Table sinks
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Values {@link DynamicTableSink} for testing.
+ */
+ private static class CollectTableSink implements DynamicTableSink {
+
+ private final TableSchema schema;
+ private final String tableName;
+
+ private CollectTableSink(
+ TableSchema schema,
+ String tableName) {
+ this.schema = schema;
+ this.tableName = tableName;
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+ return ChangelogMode.newBuilder()
+ .addContainedKind(RowKind.INSERT)
+ .addContainedKind(RowKind.DELETE)
+ .addContainedKind(RowKind.UPDATE_AFTER)
+ .build();
+ }
+
+ @Override
+ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+ final DataType rowType = schema.toPhysicalRowDataType();
+ final RowTypeInfo rowTypeInfo = (RowTypeInfo) TypeConversions.fromDataTypeToLegacyInfo(rowType);
+ DataStructureConverter converter = context.createDataStructureConverter(schema.toPhysicalRowDataType());
+ return SinkFunctionProvider.of(new CollectSinkFunction(converter, rowTypeInfo));
+ }
+
+ @Override
+ public DynamicTableSink copy() {
+ return new CollectTableSink(schema, tableName);
+ }
+
+ @Override
+ public String asSummaryString() {
+ return "CollectSink";
+ }
+ }
+
+ static class CollectSinkFunction extends RichSinkFunction implements CheckpointedFunction {
+
+ private static final long serialVersionUID = 1L;
+ private final DynamicTableSink.DataStructureConverter converter;
+ private final RowTypeInfo rowTypeInfo;
+
+ protected transient ListState resultState;
+ protected transient List localResult;
+
+ private int taskID;
+
+ protected CollectSinkFunction(DynamicTableSink.DataStructureConverter converter, RowTypeInfo rowTypeInfo) {
+ this.converter = converter;
+ this.rowTypeInfo = rowTypeInfo;
+ }
+
+ @Override
+ public void invoke(RowData value, Context context) {
+ Row row = (Row) converter.toExternal(value);
+ assert row != null;
+ row.setKind(value.getRowKind());
+ RESULT.get(taskID).add(row);
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext context) throws Exception {
+ this.resultState = context.getOperatorStateStore().getListState(
+ new ListStateDescriptor<>("sink-results", rowTypeInfo));
+ this.localResult = new ArrayList<>();
+ if (context.isRestored()) {
+ for (Row value : resultState.get()) {
+ localResult.add(value);
+ }
+ }
+ this.taskID = getRuntimeContext().getIndexOfThisSubtask();
+ synchronized (CollectSinkTableFactory.class) {
+ RESULT.put(taskID, localResult);
+ }
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) throws Exception {
+ resultState.clear();
+ resultState.addAll(RESULT.get(taskID));
+ }
+ }
+}
diff --git a/hudi-examples/hudi-examples-flink/src/main/java/org/apache/hudi/examples/quickstart/factory/ContinuousFileSourceFactory.java b/hudi-examples/hudi-examples-flink/src/main/java/org/apache/hudi/examples/quickstart/factory/ContinuousFileSourceFactory.java
new file mode 100644
index 0000000000000..834fa9f252fd5
--- /dev/null
+++ b/hudi-examples/hudi-examples-flink/src/main/java/org/apache/hudi/examples/quickstart/factory/ContinuousFileSourceFactory.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.examples.quickstart.factory;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import java.util.Collections;
+import java.util.Set;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.examples.quickstart.source.ContinuousFileSource;
+
+/**
+ * Factory for ContinuousFileSource.
+ */
+public class ContinuousFileSourceFactory implements DynamicTableSourceFactory {
+ public static final String FACTORY_ID = "continuous-file-source";
+
+ public static final ConfigOption CHECKPOINTS = ConfigOptions
+ .key("checkpoints")
+ .intType()
+ .defaultValue(2)
+ .withDescription("Number of checkpoints to write the data set as, default 2");
+
+ @Override
+ public DynamicTableSource createDynamicTableSource(Context context) {
+ FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+ helper.validate();
+
+ Configuration conf = (Configuration) helper.getOptions();
+ Path path = new Path(conf.getOptional(FlinkOptions.PATH).orElseThrow(() ->
+ new ValidationException("Option [path] should be not empty.")));
+ return new ContinuousFileSource(context.getCatalogTable().getResolvedSchema(), path, conf);
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ return FACTORY_ID;
+ }
+
+ @Override
+ public Set> requiredOptions() {
+ return Collections.singleton(FlinkOptions.PATH);
+ }
+
+ @Override
+ public Set> optionalOptions() {
+ return Collections.singleton(CHECKPOINTS);
+ }
+}
diff --git a/hudi-examples/hudi-examples-flink/src/main/java/org/apache/hudi/examples/quickstart/source/ContinuousFileSource.java b/hudi-examples/hudi-examples-flink/src/main/java/org/apache/hudi/examples/quickstart/source/ContinuousFileSource.java
new file mode 100644
index 0000000000000..b457a7e6080ab
--- /dev/null
+++ b/hudi-examples/hudi-examples-flink/src/main/java/org/apache/hudi/examples/quickstart/source/ContinuousFileSource.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.examples.quickstart.source;
+
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.formats.json.JsonRowDataDeserializationSchema;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.source.DataStreamScanProvider;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.hudi.examples.quickstart.factory.ContinuousFileSourceFactory.CHECKPOINTS;
+
+/**
+ * A continuous file source that can trigger checkpoints continuously.
+ *
+ *
It loads the data in the specified file and split the data into number of checkpoints batches.
+ * Say, if you want 4 checkpoints and there are 8 records in the file, the emit strategy is:
+ *
+ *
+ * | 2 records | 2 records | 2 records | 2 records |
+ * | cp1 | cp2 |cp3 | cp4 |
+ *
+ *
+ *
If all the data are flushed out, it waits for the next checkpoint to finish and tear down the source.
+ */
+public class ContinuousFileSource implements ScanTableSource {
+
+ private final ResolvedSchema tableSchema;
+ private final Path path;
+ private final Configuration conf;
+
+ public ContinuousFileSource(
+ ResolvedSchema tableSchema,
+ Path path,
+ Configuration conf) {
+ this.tableSchema = tableSchema;
+ this.path = path;
+ this.conf = conf;
+ }
+
+ @Override
+ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
+ return new DataStreamScanProvider() {
+
+ @Override
+ public boolean isBounded() {
+ return false;
+ }
+
+ @Override
+ public DataStream produceDataStream(StreamExecutionEnvironment execEnv) {
+ final RowType rowType = (RowType) tableSchema.toSourceRowDataType().getLogicalType();
+ JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema(
+ rowType,
+ InternalTypeInfo.of(rowType),
+ false,
+ true,
+ TimestampFormat.ISO_8601);
+
+ return execEnv.addSource(new BoundedSourceFunction(path, conf.getInteger(CHECKPOINTS)))
+ .name("continuous_file_source")
+ .setParallelism(1)
+ .map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)),
+ InternalTypeInfo.of(rowType));
+ }
+ };
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode() {
+ return ChangelogMode.insertOnly();
+ }
+
+ @Override
+ public DynamicTableSource copy() {
+ return new ContinuousFileSource(this.tableSchema, this.path, this.conf);
+ }
+
+ @Override
+ public String asSummaryString() {
+ return "ContinuousFileSource";
+ }
+
+ /**
+ * Source function that partition the data into given number checkpoints batches.
+ */
+ public static class BoundedSourceFunction implements SourceFunction, CheckpointListener {
+ private final Path path;
+ private List dataBuffer;
+
+ private final int checkpoints;
+ private final AtomicInteger currentCP = new AtomicInteger(0);
+
+ private volatile boolean isRunning = true;
+
+ public BoundedSourceFunction(Path path, int checkpoints) {
+ this.path = path;
+ this.checkpoints = checkpoints;
+ }
+
+ @Override
+ public void run(SourceContext context) throws Exception {
+ if (this.dataBuffer == null) {
+ loadDataBuffer();
+ }
+ int oldCP = this.currentCP.get();
+ boolean finish = false;
+ while (isRunning) {
+ int batchSize = this.dataBuffer.size() / this.checkpoints;
+ int start = batchSize * oldCP;
+ synchronized (context.getCheckpointLock()) {
+ for (int i = start; i < start + batchSize; i++) {
+ if (i >= this.dataBuffer.size()) {
+ finish = true;
+ break;
+ // wait for the next checkpoint and exit
+ }
+ context.collect(this.dataBuffer.get(i));
+ }
+ }
+ oldCP++;
+ while (this.currentCP.get() < oldCP) {
+ synchronized (context.getCheckpointLock()) {
+ context.getCheckpointLock().wait(10);
+ }
+ }
+ if (finish || !isRunning) {
+ return;
+ }
+ }
+ }
+
+ @Override
+ public void cancel() {
+ this.isRunning = false;
+ }
+
+ private void loadDataBuffer() {
+ try {
+ this.dataBuffer = Files.readAllLines(Paths.get(this.path.toUri()));
+ } catch (IOException e) {
+ throw new RuntimeException("Read file " + this.path + " error", e);
+ }
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long l) {
+ this.currentCP.incrementAndGet();
+ }
+ }
+}
diff --git a/hudi-examples/hudi-examples-flink/src/main/java/org/apache/hudi/examples/quickstart/utils/QuickstartConfigurations.java b/hudi-examples/hudi-examples-flink/src/main/java/org/apache/hudi/examples/quickstart/utils/QuickstartConfigurations.java
new file mode 100644
index 0000000000000..8dfd9df9eb479
--- /dev/null
+++ b/hudi-examples/hudi-examples-flink/src/main/java/org/apache/hudi/examples/quickstart/utils/QuickstartConfigurations.java
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.examples.quickstart.utils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.examples.quickstart.factory.CollectSinkTableFactory;
+import org.apache.hudi.examples.quickstart.factory.ContinuousFileSourceFactory;
+import org.apache.hudi.streamer.FlinkStreamerConfig;
+
+/**
+ * Configurations for the test.
+ */
+public class QuickstartConfigurations {
+ private QuickstartConfigurations() {
+ }
+
+ public static final DataType ROW_DATA_TYPE = DataTypes.ROW(
+ DataTypes.FIELD("uuid", DataTypes.VARCHAR(20)),// record key
+ DataTypes.FIELD("name", DataTypes.VARCHAR(10)),
+ DataTypes.FIELD("age", DataTypes.INT()),
+ DataTypes.FIELD("ts", DataTypes.TIMESTAMP(3)), // precombine field
+ DataTypes.FIELD("partition", DataTypes.VARCHAR(10)))
+ .notNull();
+
+ public static final RowType ROW_TYPE = (RowType) ROW_DATA_TYPE.getLogicalType();
+
+ public static final ResolvedSchema TABLE_SCHEMA = SchemaBuilder.instance()
+ .fields(ROW_TYPE.getFieldNames(), ROW_DATA_TYPE.getChildren())
+ .build();
+
+ private static final List FIELDS = ROW_TYPE.getFields().stream()
+ .map(RowType.RowField::asSummaryString).collect(Collectors.toList());
+
+ public static final DataType ROW_DATA_TYPE_WIDER = DataTypes.ROW(
+ DataTypes.FIELD("uuid", DataTypes.VARCHAR(20)),// record key
+ DataTypes.FIELD("name", DataTypes.VARCHAR(10)),
+ DataTypes.FIELD("age", DataTypes.INT()),
+ DataTypes.FIELD("salary", DataTypes.DOUBLE()),
+ DataTypes.FIELD("ts", DataTypes.TIMESTAMP(3)), // precombine field
+ DataTypes.FIELD("partition", DataTypes.VARCHAR(10)))
+ .notNull();
+
+ public static final RowType ROW_TYPE_WIDER = (RowType) ROW_DATA_TYPE_WIDER.getLogicalType();
+
+ public static String getCreateHoodieTableDDL(String tableName, Map options) {
+ return getCreateHoodieTableDDL(tableName, options, true, "partition");
+ }
+
+ public static String getCreateHoodieTableDDL(
+ String tableName,
+ Map options,
+ boolean havePartition,
+ String partitionField) {
+ return getCreateHoodieTableDDL(tableName, FIELDS, options, havePartition, "uuid", partitionField);
+ }
+
+ public static String getCreateHoodieTableDDL(
+ String tableName,
+ List fields,
+ Map options,
+ boolean havePartition,
+ String pkField,
+ String partitionField) {
+ StringBuilder builder = new StringBuilder();
+ builder.append("create table ").append(tableName).append("(\n");
+ for (String field : fields) {
+ builder.append(" ").append(field).append(",\n");
+ }
+ builder.append(" PRIMARY KEY(").append(pkField).append(") NOT ENFORCED\n")
+ .append(")\n");
+ if (havePartition) {
+ builder.append("PARTITIONED BY (`").append(partitionField).append("`)\n");
+ }
+ final String connector = options.computeIfAbsent("connector", k -> "hudi");
+ builder.append("with (\n"
+ + " 'connector' = '").append(connector).append("'");
+ options.forEach((k, v) -> builder.append(",\n")
+ .append(" '").append(k).append("' = '").append(v).append("'"));
+ builder.append("\n)");
+ return builder.toString();
+ }
+
+ public static String getCreateHudiCatalogDDL(final String catalogName, final String catalogPath) {
+ StringBuilder builder = new StringBuilder();
+ builder.append("create catalog ").append(catalogName).append(" with (\n");
+ builder.append(" 'type' = 'hudi',\n"
+ + " 'catalog.path' = '").append(catalogPath).append("'");
+ builder.append("\n)");
+ return builder.toString();
+ }
+
+ public static String getFileSourceDDL(String tableName) {
+ return getFileSourceDDL(tableName, "source-file.json");
+ }
+
+ public static String getFileSourceDDL(String tableName, int checkpoints) {
+ return getFileSourceDDL(tableName, "source-file.json", checkpoints);
+ }
+
+ public static String getFileSourceDDL(String tableName, String fileName) {
+ return getFileSourceDDL(tableName, fileName, 2);
+ }
+
+ public static String getFileSourceDDL(String tableName, String fileName, int checkpoints) {
+ String sourcePath = Objects.requireNonNull(Thread.currentThread()
+ .getContextClassLoader().getResource(fileName)).toString();
+ return "create table " + tableName + "(\n"
+ + " uuid varchar(20),\n"
+ + " name varchar(10),\n"
+ + " age int,\n"
+ + " ts timestamp(3),\n"
+ + " `partition` varchar(20)\n"
+ + ") with (\n"
+ + " 'connector' = '" + ContinuousFileSourceFactory.FACTORY_ID + "',\n"
+ + " 'path' = '" + sourcePath + "',\n"
+ + " 'checkpoints' = '" + checkpoints + "'\n"
+ + ")";
+ }
+
+ public static String getCollectSinkDDL(String tableName) {
+ return "create table " + tableName + "(\n"
+ + " uuid varchar(20),\n"
+ + " name varchar(10),\n"
+ + " age int,\n"
+ + " ts timestamp(3),\n"
+ + " `partition` varchar(20)\n"
+ + ") with (\n"
+ + " 'connector' = '" + CollectSinkTableFactory.FACTORY_ID + "'"
+ + ")";
+ }
+
+ public static String getCollectSinkDDL(String tableName, TableSchema tableSchema) {
+ final StringBuilder builder = new StringBuilder("create table " + tableName + "(\n");
+ String[] fieldNames = tableSchema.getFieldNames();
+ DataType[] fieldTypes = tableSchema.getFieldDataTypes();
+ for (int i = 0; i < fieldNames.length; i++) {
+ builder.append(" `")
+ .append(fieldNames[i])
+ .append("` ")
+ .append(fieldTypes[i].toString());
+ if (i != fieldNames.length - 1) {
+ builder.append(",");
+ }
+ builder.append("\n");
+ }
+ final String withProps = ""
+ + ") with (\n"
+ + " 'connector' = '" + CollectSinkTableFactory.FACTORY_ID + "'\n"
+ + ")";
+ builder.append(withProps);
+ return builder.toString();
+ }
+
+ public static String getCsvSourceDDL(String tableName, String fileName) {
+ String sourcePath = Objects.requireNonNull(Thread.currentThread()
+ .getContextClassLoader().getResource(fileName)).toString();
+ return "create table " + tableName + "(\n"
+ + " uuid varchar(20),\n"
+ + " name varchar(10),\n"
+ + " age int,\n"
+ + " ts timestamp(3),\n"
+ + " `partition` varchar(20)\n"
+ + ") with (\n"
+ + " 'connector' = 'filesystem',\n"
+ + " 'path' = '" + sourcePath + "',\n"
+ + " 'format' = 'csv'\n"
+ + ")";
+ }
+
+ public static final RowDataSerializer SERIALIZER = new RowDataSerializer(ROW_TYPE);
+
+ public static Configuration getDefaultConf(String tablePath) {
+ Configuration conf = new Configuration();
+ conf.setString(FlinkOptions.PATH, tablePath);
+ conf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH,
+ Objects.requireNonNull(Thread.currentThread()
+ .getContextClassLoader().getResource("test_read_schema.avsc")).toString());
+ conf.setString(FlinkOptions.TABLE_NAME, "TestHoodieTable");
+ conf.setString(FlinkOptions.PARTITION_PATH_FIELD, "partition");
+ return conf;
+ }
+
+ public static FlinkStreamerConfig getDefaultStreamerConf(String tablePath) {
+ FlinkStreamerConfig streamerConf = new FlinkStreamerConfig();
+ streamerConf.targetBasePath = tablePath;
+ streamerConf.sourceAvroSchemaPath = Objects.requireNonNull(Thread.currentThread()
+ .getContextClassLoader().getResource("test_read_schema.avsc")).toString();
+ streamerConf.targetTableName = "TestHoodieTable";
+ streamerConf.partitionPathField = "partition";
+ streamerConf.tableType = "COPY_ON_WRITE";
+ streamerConf.checkpointInterval = 4000L;
+ return streamerConf;
+ }
+
+ /**
+ * Creates the tool to build hoodie table DDL.
+ */
+ public static Sql sql(String tableName) {
+ return new Sql(tableName);
+ }
+
+ public static Catalog catalog(String catalogName) {
+ return new Catalog(catalogName);
+ }
+
+ // -------------------------------------------------------------------------
+ // Utilities
+ // -------------------------------------------------------------------------
+
+ /**
+ * Tool to build hoodie table DDL with schema {@link #TABLE_SCHEMA}.
+ */
+ public static class Sql {
+ private final Map options;
+ private final String tableName;
+ private List fields = new ArrayList<>();
+ private boolean withPartition = true;
+ private String pkField = "uuid";
+ private String partitionField = "partition";
+
+ public Sql(String tableName) {
+ options = new HashMap<>();
+ this.tableName = tableName;
+ }
+
+ public Sql option(ConfigOption> option, Object val) {
+ this.options.put(option.key(), val.toString());
+ return this;
+ }
+
+ public Sql option(String key, Object val) {
+ this.options.put(key, val.toString());
+ return this;
+ }
+
+ public Sql options(Map options) {
+ this.options.putAll(options);
+ return this;
+ }
+
+ public Sql noPartition() {
+ this.withPartition = false;
+ return this;
+ }
+
+ public Sql pkField(String pkField) {
+ this.pkField = pkField;
+ return this;
+ }
+
+ public Sql partitionField(String partitionField) {
+ this.partitionField = partitionField;
+ return this;
+ }
+
+ public Sql field(String fieldSchema) {
+ fields.add(fieldSchema);
+ return this;
+ }
+
+ public String end() {
+ if (this.fields.size() == 0) {
+ this.fields = FIELDS;
+ }
+ return QuickstartConfigurations.getCreateHoodieTableDDL(this.tableName, this.fields, options,
+ this.withPartition, this.pkField, this.partitionField);
+ }
+ }
+
+ public static class Catalog {
+ private final String catalogName;
+ private String catalogPath = ".";
+
+ public Catalog(String catalogName) {
+ this.catalogName = catalogName;
+ }
+
+ public Catalog catalogPath(String catalogPath) {
+ this.catalogPath = catalogPath;
+ return this;
+ }
+
+ public String end() {
+ return QuickstartConfigurations.getCreateHudiCatalogDDL(catalogName, catalogPath);
+ }
+ }
+}
diff --git a/hudi-examples/hudi-examples-flink/src/main/java/org/apache/hudi/examples/quickstart/utils/SchemaBuilder.java b/hudi-examples/hudi-examples-flink/src/main/java/org/apache/hudi/examples/quickstart/utils/SchemaBuilder.java
new file mode 100644
index 0000000000000..76306f780646d
--- /dev/null
+++ b/hudi-examples/hudi-examples-flink/src/main/java/org/apache/hudi/examples/quickstart/utils/SchemaBuilder.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.examples.quickstart.utils;
+
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UniqueConstraint;
+import org.apache.flink.table.catalog.WatermarkSpec;
+import org.apache.flink.table.types.DataType;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Builder for {@link ResolvedSchema}.
+ */
+public class SchemaBuilder {
+ private List columns;
+ private List watermarkSpecs;
+ private UniqueConstraint constraint;
+
+ public static SchemaBuilder instance() {
+ return new SchemaBuilder();
+ }
+
+ private SchemaBuilder() {
+ this.columns = new ArrayList<>();
+ this.watermarkSpecs = new ArrayList<>();
+ }
+
+ public SchemaBuilder field(String name, DataType type) {
+ this.columns.add(Column.physical(name, type));
+ return this;
+ }
+
+ public SchemaBuilder fields(List names, List types) {
+ List columns = IntStream.range(0, names.size())
+ .mapToObj(idx -> Column.physical(names.get(idx), types.get(idx)))
+ .collect(Collectors.toList());
+ this.columns.addAll(columns);
+ return this;
+ }
+
+ public SchemaBuilder primaryKey(String... columns) {
+ this.constraint = UniqueConstraint.primaryKey("pk", Arrays.asList(columns));
+ return this;
+ }
+
+ public ResolvedSchema build() {
+ return new ResolvedSchema(columns, watermarkSpecs, constraint);
+ }
+}
diff --git a/hudi-examples/hudi-examples-flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/hudi-examples/hudi-examples-flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 0000000000000..27a137292b388
--- /dev/null
+++ b/hudi-examples/hudi-examples-flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.hudi.examples.quickstart.factory.ContinuousFileSourceFactory
+org.apache.hudi.examples.quickstart.factory.CollectSinkTableFactory
diff --git a/hudi-examples/hudi-examples-flink/src/main/resources/source-file.json b/hudi-examples/hudi-examples-flink/src/main/resources/source-file.json
new file mode 100644
index 0000000000000..2f628e29c535b
--- /dev/null
+++ b/hudi-examples/hudi-examples-flink/src/main/resources/source-file.json
@@ -0,0 +1,8 @@
+{"uuid": "id1", "name": "Danny", "age": 23, "ts": "1970-01-01T00:00:01", "partition": "par1"}
+{"uuid": "id2", "name": "Stephen", "age": 33, "ts": "1970-01-01T00:00:02", "partition": "par1"}
+{"uuid": "id3", "name": "Julian", "age": 53, "ts": "1970-01-01T00:00:03", "partition": "par2"}
+{"uuid": "id4", "name": "Fabian", "age": 31, "ts": "1970-01-01T00:00:04", "partition": "par2"}
+{"uuid": "id5", "name": "Sophia", "age": 18, "ts": "1970-01-01T00:00:05", "partition": "par3"}
+{"uuid": "id6", "name": "Emma", "age": 20, "ts": "1970-01-01T00:00:06", "partition": "par3"}
+{"uuid": "id7", "name": "Bob", "age": 44, "ts": "1970-01-01T00:00:07", "partition": "par4"}
+{"uuid": "id8", "name": "Han", "age": 56, "ts": "1970-01-01T00:00:08", "partition": "par4"}
diff --git a/hudi-examples/hudi-examples-flink/src/test/java/org/apache/hudi/examples/quickstart/TestHoodieFlinkQuickstart.java b/hudi-examples/hudi-examples-flink/src/test/java/org/apache/hudi/examples/quickstart/TestHoodieFlinkQuickstart.java
new file mode 100644
index 0000000000000..3ac5f43c48652
--- /dev/null
+++ b/hudi-examples/hudi-examples-flink/src/test/java/org/apache/hudi/examples/quickstart/TestHoodieFlinkQuickstart.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.examples.quickstart;
+
+import static org.apache.hudi.examples.quickstart.TestQuickstartData.assertRowsEquals;
+import java.io.File;
+import java.util.List;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+/**
+ * IT cases for Hoodie table source and sink.
+ */
+public class TestHoodieFlinkQuickstart extends AbstractTestBase {
+ private final HoodieFlinkQuickstart flinkQuickstart = HoodieFlinkQuickstart.instance();
+
+ @BeforeEach
+ void beforeEach() {
+ flinkQuickstart.initEnv();
+ }
+
+ @TempDir
+ File tempFile;
+
+ @ParameterizedTest
+ @EnumSource(value = HoodieTableType.class)
+ void testHoodieFlinkQuickstart(HoodieTableType tableType) throws Exception {
+ // create filesystem table named source
+ flinkQuickstart.createFileSource();
+
+ // create hudi table
+ flinkQuickstart.createHudiTable(tempFile.getAbsolutePath(), "t1", tableType);
+
+ // insert data
+ List rows = flinkQuickstart.insertData();
+ assertRowsEquals(rows, TestQuickstartData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT);
+
+ // query data
+ List rows1 = flinkQuickstart.queryData();
+ assertRowsEquals(rows1, TestQuickstartData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT);
+
+ // update data
+ List rows2 = flinkQuickstart.updateData();
+ assertRowsEquals(rows2, TestQuickstartData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT);
+ }
+}
diff --git a/hudi-examples/hudi-examples-flink/src/test/java/org/apache/hudi/examples/quickstart/TestQuickstartData.java b/hudi-examples/hudi-examples-flink/src/test/java/org/apache/hudi/examples/quickstart/TestQuickstartData.java
new file mode 100644
index 0000000000000..b0d7e79814349
--- /dev/null
+++ b/hudi-examples/hudi-examples-flink/src/test/java/org/apache/hudi/examples/quickstart/TestQuickstartData.java
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.examples.quickstart;
+
+import static junit.framework.TestCase.assertEquals;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.data.conversion.DataStructureConverter;
+import org.apache.flink.table.data.conversion.DataStructureConverters;
+import org.apache.flink.table.data.writer.BinaryRowWriter;
+import org.apache.flink.table.data.writer.BinaryWriter;
+import org.apache.flink.table.runtime.typeutils.InternalSerializers;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
+import org.apache.hudi.examples.quickstart.utils.QuickstartConfigurations;
+import org.apache.parquet.Strings;
+import org.apache.parquet.avro.AvroParquetReader;
+import org.apache.parquet.hadoop.ParquetReader;
+
+/**
+ * Data set for testing, also some utilities to check the results.
+ */
+public class TestQuickstartData {
+
+ public static List DATA_SET_INSERT_DUPLICATES = new ArrayList<>();
+
+ static {
+ IntStream.range(0, 5).forEach(i -> DATA_SET_INSERT_DUPLICATES.add(
+ insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23,
+ TimestampData.fromEpochMillis(1), StringData.fromString("par1"))));
+ }
+
+ public static List DATA_SET_INSERT_SAME_KEY = new ArrayList<>();
+
+ static {
+ IntStream.range(0, 5).forEach(i -> DATA_SET_INSERT_SAME_KEY.add(
+ insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23,
+ TimestampData.fromEpochMillis(i), StringData.fromString("par1"))));
+ }
+
+ // data set of source-file.json latest commit.
+ public static List DATA_SET_SOURCE_INSERT_LATEST_COMMIT = Arrays.asList(
+ insertRow(StringData.fromString("id5"), StringData.fromString("Sophia"), 18,
+ TimestampData.fromEpochMillis(5000), StringData.fromString("par3")),
+ insertRow(StringData.fromString("id6"), StringData.fromString("Emma"), 20,
+ TimestampData.fromEpochMillis(6000), StringData.fromString("par3")),
+ insertRow(StringData.fromString("id7"), StringData.fromString("Bob"), 44,
+ TimestampData.fromEpochMillis(7000), StringData.fromString("par4")),
+ insertRow(StringData.fromString("id8"), StringData.fromString("Han"), 56,
+ TimestampData.fromEpochMillis(8000), StringData.fromString("par4"))
+ );
+
+ public static List DATA_SET_DISORDER_UPDATE_DELETE = Arrays.asList(
+ // DISORDER UPDATE
+ updateAfterRow(StringData.fromString("id1"), StringData.fromString("Danny"), 21,
+ TimestampData.fromEpochMillis(3), StringData.fromString("par1")),
+ updateAfterRow(StringData.fromString("id1"), StringData.fromString("Danny"), 20,
+ TimestampData.fromEpochMillis(2), StringData.fromString("par1")),
+ updateBeforeRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23,
+ TimestampData.fromEpochMillis(1), StringData.fromString("par1")),
+ updateBeforeRow(StringData.fromString("id1"), StringData.fromString("Danny"), 20,
+ TimestampData.fromEpochMillis(2), StringData.fromString("par1")),
+ updateAfterRow(StringData.fromString("id1"), StringData.fromString("Danny"), 22,
+ TimestampData.fromEpochMillis(4), StringData.fromString("par1")),
+ updateBeforeRow(StringData.fromString("id1"), StringData.fromString("Danny"), 21,
+ TimestampData.fromEpochMillis(3), StringData.fromString("par1")),
+ // DISORDER DELETE
+ deleteRow(StringData.fromString("id1"), StringData.fromString("Danny"), 22,
+ TimestampData.fromEpochMillis(2), StringData.fromString("par1"))
+ );
+
+ public static List dataSetInsert(int... ids) {
+ List inserts = new ArrayList<>();
+ Arrays.stream(ids).forEach(i -> inserts.add(
+ insertRow(StringData.fromString("id" + i), StringData.fromString("Danny"), 23,
+ TimestampData.fromEpochMillis(i), StringData.fromString("par1"))));
+ return inserts;
+ }
+
+ private static Integer toIdSafely(Object id) {
+ if (id == null) {
+ return -1;
+ }
+ final String idStr = id.toString();
+ if (idStr.startsWith("id")) {
+ return Integer.parseInt(idStr.substring(2));
+ }
+ return -1;
+ }
+
+ /**
+ * Returns string format of a list of RowData.
+ */
+ public static String rowDataToString(List rows) {
+ DataStructureConverter