From da2530bd4ac745546c6c2ffb549195924ee1d95a Mon Sep 17 00:00:00 2001 From: XuQianJin-Stars Date: Mon, 21 Mar 2022 16:42:25 +0800 Subject: [PATCH] [HUDI-3563] Make quickstart examples covered by CI tests --- .github/workflows/bot.yml | 59 ++- azure-pipelines.yml | 5 +- hudi-examples/hudi-examples-common/pom.xml | 109 +++++ .../common/HoodieExampleDataGenerator.java | 3 +- hudi-examples/hudi-examples-flink/pom.xml | 364 +++++++++++++++ .../quickstart/HoodieFlinkQuickstart.java | 211 +++++++++ .../factory/CollectSinkTableFactory.java | 178 ++++++++ .../factory/ContinuousFileSourceFactory.java | 72 +++ .../source/ContinuousFileSource.java | 185 ++++++++ .../utils/QuickstartConfigurations.java | 317 +++++++++++++ .../quickstart/utils/SchemaBuilder.java | 71 +++ .../org.apache.flink.table.factories.Factory | 18 + .../src/main/resources/source-file.json | 8 + .../quickstart/TestHoodieFlinkQuickstart.java | 68 +++ .../quickstart/TestQuickstartData.java | 422 ++++++++++++++++++ .../org.apache.flink.table.factories.Factory | 18 + .../resources/log4j-surefire-quiet.properties | 30 ++ .../test/resources/log4j-surefire.properties | 31 ++ hudi-examples/hudi-examples-java/pom.xml | 129 ++++++ .../java/HoodieJavaWriteClientExample.java | 0 hudi-examples/hudi-examples-spark/pom.xml | 283 ++++++++++++ .../common/ExampleDataSchemaProvider.java | 1 - .../common/HoodieExampleSparkUtils.java | 0 .../examples/common/IdentityTransformer.java | 0 .../examples/common/RandomJsonSource.java | 0 .../quickstart/HoodieSparkQuickstart.java | 227 ++++++++++ .../examples/quickstart}/QuickstartUtils.java | 4 +- .../spark/HoodieSparkBootstrapExample.java | 0 .../spark/HoodieWriteClientExample.java | 0 .../dfs/source-file.json | 0 .../kafka/kafka-source.properties | 0 .../spark/HoodieDataSourceExample.scala | 6 +- .../spark/HoodieMorCompactionJob.scala | 3 +- .../quickstart/TestHoodieSparkQuickstart.java | 115 +++++ .../quickstart}/TestQuickstartUtils.java | 2 +- .../resources/log4j-surefire-quiet.properties | 30 ++ .../test/resources/log4j-surefire.properties | 31 ++ hudi-examples/pom.xml | 205 +-------- 38 files changed, 2980 insertions(+), 225 deletions(-) create mode 100644 hudi-examples/hudi-examples-common/pom.xml rename hudi-examples/{ => hudi-examples-common}/src/main/java/org/apache/hudi/examples/common/HoodieExampleDataGenerator.java (99%) create mode 100644 hudi-examples/hudi-examples-flink/pom.xml create mode 100644 hudi-examples/hudi-examples-flink/src/main/java/org/apache/hudi/examples/quickstart/HoodieFlinkQuickstart.java create mode 100644 hudi-examples/hudi-examples-flink/src/main/java/org/apache/hudi/examples/quickstart/factory/CollectSinkTableFactory.java create mode 100644 hudi-examples/hudi-examples-flink/src/main/java/org/apache/hudi/examples/quickstart/factory/ContinuousFileSourceFactory.java create mode 100644 hudi-examples/hudi-examples-flink/src/main/java/org/apache/hudi/examples/quickstart/source/ContinuousFileSource.java create mode 100644 hudi-examples/hudi-examples-flink/src/main/java/org/apache/hudi/examples/quickstart/utils/QuickstartConfigurations.java create mode 100644 hudi-examples/hudi-examples-flink/src/main/java/org/apache/hudi/examples/quickstart/utils/SchemaBuilder.java create mode 100644 hudi-examples/hudi-examples-flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory create mode 100644 hudi-examples/hudi-examples-flink/src/main/resources/source-file.json create mode 100644 hudi-examples/hudi-examples-flink/src/test/java/org/apache/hudi/examples/quickstart/TestHoodieFlinkQuickstart.java create mode 100644 hudi-examples/hudi-examples-flink/src/test/java/org/apache/hudi/examples/quickstart/TestQuickstartData.java create mode 100644 hudi-examples/hudi-examples-flink/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory create mode 100644 hudi-examples/hudi-examples-flink/src/test/resources/log4j-surefire-quiet.properties create mode 100644 hudi-examples/hudi-examples-flink/src/test/resources/log4j-surefire.properties create mode 100644 hudi-examples/hudi-examples-java/pom.xml rename hudi-examples/{ => hudi-examples-java}/src/main/java/org/apache/hudi/examples/java/HoodieJavaWriteClientExample.java (100%) create mode 100644 hudi-examples/hudi-examples-spark/pom.xml rename hudi-examples/{ => hudi-examples-spark}/src/main/java/org/apache/hudi/examples/common/ExampleDataSchemaProvider.java (99%) rename hudi-examples/{ => hudi-examples-spark}/src/main/java/org/apache/hudi/examples/common/HoodieExampleSparkUtils.java (100%) rename hudi-examples/{ => hudi-examples-spark}/src/main/java/org/apache/hudi/examples/common/IdentityTransformer.java (100%) rename hudi-examples/{ => hudi-examples-spark}/src/main/java/org/apache/hudi/examples/common/RandomJsonSource.java (100%) create mode 100644 hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/HoodieSparkQuickstart.java rename {hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi => hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart}/QuickstartUtils.java (99%) rename hudi-examples/{ => hudi-examples-spark}/src/main/java/org/apache/hudi/examples/spark/HoodieSparkBootstrapExample.java (100%) rename hudi-examples/{ => hudi-examples-spark}/src/main/java/org/apache/hudi/examples/spark/HoodieWriteClientExample.java (100%) rename hudi-examples/{ => hudi-examples-spark}/src/main/resources/delta-streamer-config/dfs/source-file.json (100%) rename hudi-examples/{ => hudi-examples-spark}/src/main/resources/delta-streamer-config/kafka/kafka-source.properties (100%) rename hudi-examples/{ => hudi-examples-spark}/src/main/scala/org/apache/hudi/examples/spark/HoodieDataSourceExample.scala (98%) rename hudi-examples/{ => hudi-examples-spark}/src/main/scala/org/apache/hudi/examples/spark/HoodieMorCompactionJob.scala (98%) create mode 100644 hudi-examples/hudi-examples-spark/src/test/java/org/apache/hudi/examples/quickstart/TestHoodieSparkQuickstart.java rename {hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi => hudi-examples/hudi-examples-spark/src/test/java/org/apache/hudi/examples/quickstart}/TestQuickstartUtils.java (97%) create mode 100644 hudi-examples/hudi-examples-spark/src/test/resources/log4j-surefire-quiet.properties create mode 100644 hudi-examples/hudi-examples-spark/src/test/resources/log4j-surefire.properties diff --git a/.github/workflows/bot.yml b/.github/workflows/bot.yml index ebf3caccd9c62..b1ec4456adcca 100644 --- a/.github/workflows/bot.yml +++ b/.github/workflows/bot.yml @@ -14,20 +14,39 @@ jobs: build: runs-on: ubuntu-latest strategy: + max-parallel: 7 matrix: include: - - scala: "scala-2.11" - spark: "spark2" - - scala: "scala-2.11" - spark: "spark2,spark-shade-unbundle-avro" - - scala: "scala-2.12" - spark: "spark3.1.x" - - scala: "scala-2.12" - spark: "spark3.1.x,spark-shade-unbundle-avro" - - scala: "scala-2.12" - spark: "spark3" - - scala: "scala-2.12" - spark: "spark3,spark-shade-unbundle-avro" + # Spark 2.4.4 + - scalaProfile: "scala-2.11" + sparkProfile: "spark2" + sparkVersion: "2.4.4" + + # Spark 3.1.x + - scalaProfile: "scala-2.12" + sparkProfile: "spark3.1.x" + sparkVersion: "3.1.0" + + - scalaProfile: "scala-2.12" + sparkProfile: "spark3.1.x" + sparkVersion: "3.1.1" + + - scalaProfile: "scala-2.12" + sparkProfile: "spark3.1.x" + sparkVersion: "3.1.2" + + - scalaProfile: "scala-2.12" + sparkProfile: "spark3.1.x" + sparkVersion: "3.1.3" + + # Spark 3.2.x + - scalaProfile: "scala-2.12" + sparkProfile: "spark3.2.0" + sparkVersion: "3.2.0" + + - scalaProfile: "scala-2.12" + sparkProfile: "spark3" + sparkVersion: "3.2.1" steps: - uses: actions/checkout@v2 - name: Set up JDK 8 @@ -38,6 +57,16 @@ jobs: architecture: x64 - name: Build Project env: - SCALA_PROFILE: ${{ matrix.scala }} - SPARK_PROFILE: ${{ matrix.spark }} - run: mvn install -P "$SCALA_PROFILE,$SPARK_PROFILE" -DskipTests=true -Dmaven.javadoc.skip=true -B -V + SCALA_PROFILE: ${{ matrix.scalaProfile }} + SPARK_PROFILE: ${{ matrix.sparkProfile }} + SPARK_VERSION: ${{ matrix.sparkVersion }} + run: + mvn -T 2.5C clean install -P "$SCALA_PROFILE,$SPARK_PROFILE" -Dspark.version="$SPARK_VERSION" -DskipTests=true -Dmaven.javadoc.skip=true -B -V + - name: Quickstart Test + env: + SCALA_PROFILE: ${{ matrix.scalaProfile }} + SPARK_PROFILE: ${{ matrix.sparkProfile }} + SPARK_VERSION: ${{ matrix.sparkVersion }} + if: ${{ !startsWith(env.SPARK_VERSION, '3.2.') }} # skip test spark 3.2 before hadoop upgrade to 3.x + run: + mvn test -Punit-tests -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -Dspark.version="$SPARK_VERSION" -DfailIfNoTests=false -pl hudi-examples/hudi-examples-flink,hudi-examples/hudi-examples-java,hudi-examples/hudi-examples-spark diff --git a/azure-pipelines.yml b/azure-pipelines.yml index 5f21b29bb6adf..f1b25db20473f 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -26,6 +26,7 @@ variables: SPARK_VERSION: '2.4.4' HADOOP_VERSION: '2.7' SPARK_ARCHIVE: spark-$(SPARK_VERSION)-bin-hadoop$(HADOOP_VERSION) + EXCLUDE_TESTED_MODULES: '!hudi-examples/hudi-examples-common,!hudi-examples/hudi-examples-flink,!hudi-examples/hudi-examples-java,!hudi-examples/hudi-examples-spark,!hudi-common,!hudi-flink-datasource/hudi-flink,!hudi-client/hudi-spark-client,!hudi-client/hudi-client-common,!hudi-client/hudi-flink-client,!hudi-client/hudi-java-client,!hudi-cli,!hudi-utilities,!hudi-sync/hudi-hive-sync' stages: - stage: test @@ -132,7 +133,7 @@ stages: inputs: mavenPomFile: 'pom.xml' goals: 'test' - options: -Punit-tests -pl !hudi-common,!hudi-flink-datasource/hudi-flink,!hudi-client/hudi-spark-client,!hudi-client/hudi-client-common,!hudi-client/hudi-flink-client,!hudi-client/hudi-java-client,!hudi-cli,!hudi-utilities,!hudi-sync/hudi-hive-sync + options: -Punit-tests -pl $(EXCLUDE_TESTED_MODULES) publishJUnitResults: false jdkVersionOption: '1.8' mavenOptions: '-Xmx2g $(MAVEN_OPTS)' @@ -141,7 +142,7 @@ stages: inputs: mavenPomFile: 'pom.xml' goals: 'test' - options: -Pfunctional-tests -pl !hudi-common,!hudi-flink-datasource/hudi-flink,!hudi-client/hudi-spark-client,!hudi-client/hudi-client-common,!hudi-client/hudi-flink-client,!hudi-client/hudi-java-client,!hudi-cli,!hudi-utilities,!hudi-sync/hudi-hive-sync + options: -Pfunctional-tests -pl $(EXCLUDE_TESTED_MODULES) publishJUnitResults: false jdkVersionOption: '1.8' mavenOptions: '-Xmx2g $(MAVEN_OPTS)' diff --git a/hudi-examples/hudi-examples-common/pom.xml b/hudi-examples/hudi-examples-common/pom.xml new file mode 100644 index 0000000000000..9c327672226bd --- /dev/null +++ b/hudi-examples/hudi-examples-common/pom.xml @@ -0,0 +1,109 @@ + + + + + hudi-examples + org.apache.hudi + 0.11.0-SNAPSHOT + + 4.0.0 + + hudi-examples-common + + + ${project.parent.basedir} + true + + + + + + src/main/resources + + + + + + net.alchim31.maven + scala-maven-plugin + + + scala-compile-first + process-resources + + add-source + compile + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + + compile + + compile + + + + + + org.apache.maven.plugins + maven-jar-plugin + + + + test-jar + + test-compile + + + + false + + + + org.apache.rat + apache-rat-plugin + + + + + + + org.apache.hudi + hudi-common + ${project.version} + + + + + org.apache.avro + avro + + + + org.apache.parquet + parquet-avro + + + diff --git a/hudi-examples/src/main/java/org/apache/hudi/examples/common/HoodieExampleDataGenerator.java b/hudi-examples/hudi-examples-common/src/main/java/org/apache/hudi/examples/common/HoodieExampleDataGenerator.java similarity index 99% rename from hudi-examples/src/main/java/org/apache/hudi/examples/common/HoodieExampleDataGenerator.java rename to hudi-examples/hudi-examples-common/src/main/java/org/apache/hudi/examples/common/HoodieExampleDataGenerator.java index 78df2e78e7081..2c94c8bc43d44 100644 --- a/hudi-examples/src/main/java/org/apache/hudi/examples/common/HoodieExampleDataGenerator.java +++ b/hudi-examples/hudi-examples-common/src/main/java/org/apache/hudi/examples/common/HoodieExampleDataGenerator.java @@ -43,7 +43,6 @@ import java.util.stream.IntStream; import java.util.stream.Stream; - /** * Class to be used to generate test data. */ @@ -63,7 +62,7 @@ public class HoodieExampleDataGenerator> { + "{\"name\":\"fare\",\"type\": \"double\"}]}"; public static Schema avroSchema = new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA); - private static Random rand = new Random(46474747); + private static final Random rand = new Random(46474747); private final Map existingKeys; private final String[] partitionPaths; diff --git a/hudi-examples/hudi-examples-flink/pom.xml b/hudi-examples/hudi-examples-flink/pom.xml new file mode 100644 index 0000000000000..7aa6c05d5e382 --- /dev/null +++ b/hudi-examples/hudi-examples-flink/pom.xml @@ -0,0 +1,364 @@ + + + + + hudi-examples + org.apache.hudi + 0.11.0-SNAPSHOT + + 4.0.0 + + hudi-examples-flink + + + ${project.parent.basedir} + true + 1.11.1 + + + + + + org.jacoco + jacoco-maven-plugin + + + org.apache.maven.plugins + maven-compiler-plugin + + 1.8 + 1.8 + + + + org.apache.maven.plugins + maven-jar-plugin + 3.1.2 + + + + test-jar + + + + + + org.apache.rat + apache-rat-plugin + + + + + + src/main/resources + + + src/test/resources + + + + + + + + org.apache.hudi + hudi-common + ${project.version} + + + org.apache.hudi + hudi-client-common + ${project.version} + + + org.apache.hudi + hudi-flink-client + ${project.version} + + + org.apache.hudi + hudi-hadoop-mr + ${project.version} + + + org.apache.hudi + hudi-hive-sync + ${project.version} + + + org.apache.hudi + hudi-sync-common + ${project.version} + + + + org.apache.hudi + hudi-flink + ${project.version} + compile + + + + + org.apache.flink + flink-streaming-java_${scala.binary.version} + compile + + + org.apache.flink + flink-clients_${scala.binary.version} + compile + + + com.esotericsoftware.kryo + kryo + + + com.esotericsoftware.minlog + minlog + + + + + org.apache.flink + flink-connector-kafka_${scala.binary.version} + compile + + + org.apache.kafka + kafka-clients + ${kafka.version} + + + org.apache.flink + flink-hadoop-compatibility_${scala.binary.version} + ${flink.version} + + + org.apache.flink + flink-parquet_${scala.binary.version} + ${flink.version} + provided + + + org.apache.flink + flink-json + ${flink.version} + provided + + + org.apache.flink + flink-table-common + ${flink.version} + provided + + + org.apache.flink + flink-table-runtime_${scala.binary.version} + ${flink.version} + provided + + + org.apache.flink + flink-table-planner_${scala.binary.version} + ${flink.version} + provided + + + org.apache.flink + flink-statebackend-rocksdb_${scala.binary.version} + ${flink.version} + provided + + + + org.apache.parquet + parquet-hadoop + ${parquet.version} + + + org.xerial.snappy + snappy-java + + + + + + + org.apache.avro + avro + + 1.10.0 + compile + + + + + org.apache.hadoop + hadoop-mapreduce-client-core + compile + + + org.slf4j + slf4j-log4j12 + + + + + + com.beust + jcommander + compile + + + com.twitter + bijection-avro_${scala.binary.version} + 0.9.7 + + + joda-time + joda-time + 2.5 + + + + ${hive.groupid} + hive-exec + ${hive.version} + ${hive.exec.classifier} + + + javax.mail + mail + + + org.eclipse.jetty.aggregate + * + + + + + + + + + org.junit.jupiter + junit-jupiter-api + test + + + org.junit.jupiter + junit-jupiter-engine + test + + + org.junit.vintage + junit-vintage-engine + test + + + org.junit.jupiter + junit-jupiter-params + test + + + + + org.apache.hudi + hudi-common + ${project.version} + tests + test-jar + test + + + org.apache.hudi + hudi-client-common + ${project.version} + tests + test-jar + test + + + org.apache.hudi + hudi-flink-client + ${project.version} + tests + test-jar + test + + + + + + + + + + + org.apache.flink + flink-test-utils_${scala.binary.version} + ${flink.version} + test + + + org.apache.flink + flink-runtime + ${flink.version} + test + test-jar + + + org.apache.flink + flink-streaming-java_${scala.binary.version} + ${flink.version} + test + test-jar + + + org.apache.flink + flink-table-runtime_${scala.binary.version} + ${flink.version} + test + test-jar + + + org.apache.flink + flink-json + ${flink.version} + test + test-jar + + + org.apache.flink + flink-csv + ${flink.version} + test + + + + + org.apache.parquet + parquet-avro + ${parquet.version} + test + + + diff --git a/hudi-examples/hudi-examples-flink/src/main/java/org/apache/hudi/examples/quickstart/HoodieFlinkQuickstart.java b/hudi-examples/hudi-examples-flink/src/main/java/org/apache/hudi/examples/quickstart/HoodieFlinkQuickstart.java new file mode 100644 index 0000000000000..b3e105015a58c --- /dev/null +++ b/hudi-examples/hudi-examples-flink/src/main/java/org/apache/hudi/examples/quickstart/HoodieFlinkQuickstart.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.examples.quickstart; + +import static org.apache.hudi.examples.quickstart.utils.QuickstartConfigurations.sql; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.api.config.ExecutionConfigOptions; +import org.apache.flink.table.api.internal.TableEnvironmentImpl; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.types.Row; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.examples.quickstart.factory.CollectSinkTableFactory; +import org.apache.hudi.examples.quickstart.utils.QuickstartConfigurations; +import org.jetbrains.annotations.NotNull; + +public final class HoodieFlinkQuickstart { + private EnvironmentSettings settings = null; + private TableEnvironment streamTableEnv = null; + + private String tableName; + + private HoodieFlinkQuickstart() { + } + + public static HoodieFlinkQuickstart instance() { + return new HoodieFlinkQuickstart(); + } + + public static void main(String[] args) throws TableNotExistException, InterruptedException { + if (args.length < 3) { + System.err.println("Usage: HoodieWriteClientExample "); + System.exit(1); + } + String tablePath = args[0]; + String tableName = args[1]; + String tableType = args[2]; + + HoodieFlinkQuickstart flinkQuickstart = instance(); + flinkQuickstart.initEnv(); + + // create filesystem table named source + flinkQuickstart.createFileSource(); + + // create hudi table + flinkQuickstart.createHudiTable(tablePath, tableName, HoodieTableType.valueOf(tableType)); + + // insert data + flinkQuickstart.insertData(); + + // query data + flinkQuickstart.queryData(); + + // update data + flinkQuickstart.updateData(); + } + + public void initEnv() { + if (this.streamTableEnv == null) { + settings = EnvironmentSettings.newInstance().build(); + TableEnvironment streamTableEnv = TableEnvironmentImpl.create(settings); + streamTableEnv.getConfig().getConfiguration() + .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1); + Configuration execConf = streamTableEnv.getConfig().getConfiguration(); + execConf.setString("execution.checkpointing.interval", "2s"); + // configure not to retry after failure + execConf.setString("restart-strategy", "fixed-delay"); + execConf.setString("restart-strategy.fixed-delay.attempts", "0"); + this.streamTableEnv = streamTableEnv; + } + } + + public TableEnvironment getStreamTableEnv() { + return streamTableEnv; + } + + public TableEnvironment getBatchTableEnv() { + Configuration conf = new Configuration(); + // for batch upsert use cases: current suggestion is to disable these 2 options, + // from 1.14, flink runtime execution mode has switched from streaming + // to batch for batch execution mode(before that, both streaming and batch use streaming execution mode), + // current batch execution mode has these limitations: + // + // 1. the keyed stream default to always sort the inputs by key; + // 2. the batch state-backend requires the inputs sort by state key + // + // For our hudi batch pipeline upsert case, we rely on the consuming sequence for index records and data records, + // the index records must be loaded first before data records for BucketAssignFunction to keep upsert semantics correct, + // so we suggest disabling these 2 options to use streaming state-backend for batch execution mode + // to keep the strategy before 1.14. + conf.setBoolean("execution.sorted-inputs.enabled", false); + conf.setBoolean("execution.batch-state-backend.enabled", false); + StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(conf); + settings = EnvironmentSettings.newInstance().inBatchMode().build(); + TableEnvironment batchTableEnv = StreamTableEnvironment.create(execEnv, settings); + batchTableEnv.getConfig().getConfiguration() + .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1); + return batchTableEnv; + } + + public void createHudiTable(String tablePath, String tableName, + HoodieTableType tableType) { + this.tableName = tableName; + + // create hudi table + String hoodieTableDDL = sql(tableName) + .option(FlinkOptions.PATH, tablePath) + .option(FlinkOptions.READ_AS_STREAMING, true) + .option(FlinkOptions.TABLE_TYPE, tableType) + .end(); + streamTableEnv.executeSql(hoodieTableDDL); + } + + public void createFileSource() { + // create filesystem table named source + String createSource = QuickstartConfigurations.getFileSourceDDL("source"); + streamTableEnv.executeSql(createSource); + } + + @NotNull List insertData() throws InterruptedException, TableNotExistException { + // insert data + String insertInto = String.format("insert into %s select * from source", tableName); + execInsertSql(streamTableEnv, insertInto); + return queryData(); + } + + List queryData() throws InterruptedException, TableNotExistException { + // query data + // reading from the latest commit instance. + return execSelectSql(streamTableEnv, String.format("select * from %s", tableName), 10); + } + + @NotNull List updateData() throws InterruptedException, TableNotExistException { + // update data + String insertInto = String.format("insert into %s select * from source", tableName); + execInsertSql(getStreamTableEnv(), insertInto); + return queryData(); + } + + public static void execInsertSql(TableEnvironment tEnv, String insert) { + TableResult tableResult = tEnv.executeSql(insert); + // wait to finish + try { + tableResult.getJobClient().get().getJobExecutionResult().get(); + } catch (InterruptedException | ExecutionException ex) { + // ignored + } + } + + public static List execSelectSql(TableEnvironment tEnv, String select, long timeout) + throws InterruptedException, TableNotExistException { + return execSelectSql(tEnv, select, timeout, null); + } + + public static List execSelectSql(TableEnvironment tEnv, String select, long timeout, String sourceTable) + throws InterruptedException, TableNotExistException { + final String sinkDDL; + if (sourceTable != null) { + // use the source table schema as the sink schema if the source table was specified, . + ObjectPath objectPath = new ObjectPath(tEnv.getCurrentDatabase(), sourceTable); + TableSchema schema = tEnv.getCatalog(tEnv.getCurrentCatalog()).get().getTable(objectPath).getSchema(); + sinkDDL = QuickstartConfigurations.getCollectSinkDDL("sink", schema); + } else { + sinkDDL = QuickstartConfigurations.getCollectSinkDDL("sink"); + } + return execSelectSql(tEnv, select, sinkDDL, timeout); + } + + public static List execSelectSql(TableEnvironment tEnv, String select, String sinkDDL, long timeout) + throws InterruptedException { + tEnv.executeSql("DROP TABLE IF EXISTS sink"); + tEnv.executeSql(sinkDDL); + TableResult tableResult = tEnv.executeSql("insert into sink " + select); + // wait for the timeout then cancels the job + TimeUnit.SECONDS.sleep(timeout); + tableResult.getJobClient().ifPresent(JobClient::cancel); + tEnv.executeSql("DROP TABLE IF EXISTS sink"); + return CollectSinkTableFactory.RESULT.values().stream() + .flatMap(Collection::stream) + .collect(Collectors.toList()); + } +} diff --git a/hudi-examples/hudi-examples-flink/src/main/java/org/apache/hudi/examples/quickstart/factory/CollectSinkTableFactory.java b/hudi-examples/hudi-examples-flink/src/main/java/org/apache/hudi/examples/quickstart/factory/CollectSinkTableFactory.java new file mode 100644 index 0000000000000..5687a7c146720 --- /dev/null +++ b/hudi-examples/hudi-examples-flink/src/main/java/org/apache/hudi/examples/quickstart/factory/CollectSinkTableFactory.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.examples.quickstart.factory; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.sink.SinkFunctionProvider; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.factories.DynamicTableSinkFactory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.utils.TypeConversions; +import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Factory for CollectTableSink. + * + *

Note: The CollectTableSink collects all the data of a table into a global collection {@code RESULT}, + * so the tests should executed in single thread and the table name should be the same. + */ +public class CollectSinkTableFactory implements DynamicTableSinkFactory { + public static final String FACTORY_ID = "collect"; + + // global results to collect and query + public static final Map> RESULT = new HashMap<>(); + + @Override + public DynamicTableSink createDynamicTableSink(Context context) { + FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); + helper.validate(); + + TableSchema schema = context.getCatalogTable().getSchema(); + RESULT.clear(); + return new CollectTableSink(schema, context.getObjectIdentifier().getObjectName()); + } + + @Override + public String factoryIdentifier() { + return FACTORY_ID; + } + + @Override + public Set> requiredOptions() { + return Collections.emptySet(); + } + + @Override + public Set> optionalOptions() { + return Collections.emptySet(); + } + + // -------------------------------------------------------------------------------------------- + // Table sinks + // -------------------------------------------------------------------------------------------- + + /** + * Values {@link DynamicTableSink} for testing. + */ + private static class CollectTableSink implements DynamicTableSink { + + private final TableSchema schema; + private final String tableName; + + private CollectTableSink( + TableSchema schema, + String tableName) { + this.schema = schema; + this.tableName = tableName; + } + + @Override + public ChangelogMode getChangelogMode(ChangelogMode requestedMode) { + return ChangelogMode.newBuilder() + .addContainedKind(RowKind.INSERT) + .addContainedKind(RowKind.DELETE) + .addContainedKind(RowKind.UPDATE_AFTER) + .build(); + } + + @Override + public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { + final DataType rowType = schema.toPhysicalRowDataType(); + final RowTypeInfo rowTypeInfo = (RowTypeInfo) TypeConversions.fromDataTypeToLegacyInfo(rowType); + DataStructureConverter converter = context.createDataStructureConverter(schema.toPhysicalRowDataType()); + return SinkFunctionProvider.of(new CollectSinkFunction(converter, rowTypeInfo)); + } + + @Override + public DynamicTableSink copy() { + return new CollectTableSink(schema, tableName); + } + + @Override + public String asSummaryString() { + return "CollectSink"; + } + } + + static class CollectSinkFunction extends RichSinkFunction implements CheckpointedFunction { + + private static final long serialVersionUID = 1L; + private final DynamicTableSink.DataStructureConverter converter; + private final RowTypeInfo rowTypeInfo; + + protected transient ListState resultState; + protected transient List localResult; + + private int taskID; + + protected CollectSinkFunction(DynamicTableSink.DataStructureConverter converter, RowTypeInfo rowTypeInfo) { + this.converter = converter; + this.rowTypeInfo = rowTypeInfo; + } + + @Override + public void invoke(RowData value, Context context) { + Row row = (Row) converter.toExternal(value); + assert row != null; + row.setKind(value.getRowKind()); + RESULT.get(taskID).add(row); + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + this.resultState = context.getOperatorStateStore().getListState( + new ListStateDescriptor<>("sink-results", rowTypeInfo)); + this.localResult = new ArrayList<>(); + if (context.isRestored()) { + for (Row value : resultState.get()) { + localResult.add(value); + } + } + this.taskID = getRuntimeContext().getIndexOfThisSubtask(); + synchronized (CollectSinkTableFactory.class) { + RESULT.put(taskID, localResult); + } + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + resultState.clear(); + resultState.addAll(RESULT.get(taskID)); + } + } +} diff --git a/hudi-examples/hudi-examples-flink/src/main/java/org/apache/hudi/examples/quickstart/factory/ContinuousFileSourceFactory.java b/hudi-examples/hudi-examples-flink/src/main/java/org/apache/hudi/examples/quickstart/factory/ContinuousFileSourceFactory.java new file mode 100644 index 0000000000000..834fa9f252fd5 --- /dev/null +++ b/hudi-examples/hudi-examples-flink/src/main/java/org/apache/hudi/examples/quickstart/factory/ContinuousFileSourceFactory.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.examples.quickstart.factory; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.factories.DynamicTableSourceFactory; +import org.apache.flink.table.factories.FactoryUtil; + +import java.util.Collections; +import java.util.Set; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.examples.quickstart.source.ContinuousFileSource; + +/** + * Factory for ContinuousFileSource. + */ +public class ContinuousFileSourceFactory implements DynamicTableSourceFactory { + public static final String FACTORY_ID = "continuous-file-source"; + + public static final ConfigOption CHECKPOINTS = ConfigOptions + .key("checkpoints") + .intType() + .defaultValue(2) + .withDescription("Number of checkpoints to write the data set as, default 2"); + + @Override + public DynamicTableSource createDynamicTableSource(Context context) { + FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); + helper.validate(); + + Configuration conf = (Configuration) helper.getOptions(); + Path path = new Path(conf.getOptional(FlinkOptions.PATH).orElseThrow(() -> + new ValidationException("Option [path] should be not empty."))); + return new ContinuousFileSource(context.getCatalogTable().getResolvedSchema(), path, conf); + } + + @Override + public String factoryIdentifier() { + return FACTORY_ID; + } + + @Override + public Set> requiredOptions() { + return Collections.singleton(FlinkOptions.PATH); + } + + @Override + public Set> optionalOptions() { + return Collections.singleton(CHECKPOINTS); + } +} diff --git a/hudi-examples/hudi-examples-flink/src/main/java/org/apache/hudi/examples/quickstart/source/ContinuousFileSource.java b/hudi-examples/hudi-examples-flink/src/main/java/org/apache/hudi/examples/quickstart/source/ContinuousFileSource.java new file mode 100644 index 0000000000000..b457a7e6080ab --- /dev/null +++ b/hudi-examples/hudi-examples-flink/src/main/java/org/apache/hudi/examples/quickstart/source/ContinuousFileSource.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.examples.quickstart.source; + +import org.apache.flink.api.common.state.CheckpointListener; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.formats.common.TimestampFormat; +import org.apache.flink.formats.json.JsonRowDataDeserializationSchema; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.source.DataStreamScanProvider; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.ScanTableSource; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.logical.RowType; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.hudi.examples.quickstart.factory.ContinuousFileSourceFactory.CHECKPOINTS; + +/** + * A continuous file source that can trigger checkpoints continuously. + * + *

It loads the data in the specified file and split the data into number of checkpoints batches. + * Say, if you want 4 checkpoints and there are 8 records in the file, the emit strategy is: + * + *

+ *   | 2 records | 2 records | 2 records | 2 records |
+ *   | cp1       | cp2       |cp3        | cp4       |
+ * 
+ * + *

If all the data are flushed out, it waits for the next checkpoint to finish and tear down the source. + */ +public class ContinuousFileSource implements ScanTableSource { + + private final ResolvedSchema tableSchema; + private final Path path; + private final Configuration conf; + + public ContinuousFileSource( + ResolvedSchema tableSchema, + Path path, + Configuration conf) { + this.tableSchema = tableSchema; + this.path = path; + this.conf = conf; + } + + @Override + public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { + return new DataStreamScanProvider() { + + @Override + public boolean isBounded() { + return false; + } + + @Override + public DataStream produceDataStream(StreamExecutionEnvironment execEnv) { + final RowType rowType = (RowType) tableSchema.toSourceRowDataType().getLogicalType(); + JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema( + rowType, + InternalTypeInfo.of(rowType), + false, + true, + TimestampFormat.ISO_8601); + + return execEnv.addSource(new BoundedSourceFunction(path, conf.getInteger(CHECKPOINTS))) + .name("continuous_file_source") + .setParallelism(1) + .map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)), + InternalTypeInfo.of(rowType)); + } + }; + } + + @Override + public ChangelogMode getChangelogMode() { + return ChangelogMode.insertOnly(); + } + + @Override + public DynamicTableSource copy() { + return new ContinuousFileSource(this.tableSchema, this.path, this.conf); + } + + @Override + public String asSummaryString() { + return "ContinuousFileSource"; + } + + /** + * Source function that partition the data into given number checkpoints batches. + */ + public static class BoundedSourceFunction implements SourceFunction, CheckpointListener { + private final Path path; + private List dataBuffer; + + private final int checkpoints; + private final AtomicInteger currentCP = new AtomicInteger(0); + + private volatile boolean isRunning = true; + + public BoundedSourceFunction(Path path, int checkpoints) { + this.path = path; + this.checkpoints = checkpoints; + } + + @Override + public void run(SourceContext context) throws Exception { + if (this.dataBuffer == null) { + loadDataBuffer(); + } + int oldCP = this.currentCP.get(); + boolean finish = false; + while (isRunning) { + int batchSize = this.dataBuffer.size() / this.checkpoints; + int start = batchSize * oldCP; + synchronized (context.getCheckpointLock()) { + for (int i = start; i < start + batchSize; i++) { + if (i >= this.dataBuffer.size()) { + finish = true; + break; + // wait for the next checkpoint and exit + } + context.collect(this.dataBuffer.get(i)); + } + } + oldCP++; + while (this.currentCP.get() < oldCP) { + synchronized (context.getCheckpointLock()) { + context.getCheckpointLock().wait(10); + } + } + if (finish || !isRunning) { + return; + } + } + } + + @Override + public void cancel() { + this.isRunning = false; + } + + private void loadDataBuffer() { + try { + this.dataBuffer = Files.readAllLines(Paths.get(this.path.toUri())); + } catch (IOException e) { + throw new RuntimeException("Read file " + this.path + " error", e); + } + } + + @Override + public void notifyCheckpointComplete(long l) { + this.currentCP.incrementAndGet(); + } + } +} diff --git a/hudi-examples/hudi-examples-flink/src/main/java/org/apache/hudi/examples/quickstart/utils/QuickstartConfigurations.java b/hudi-examples/hudi-examples-flink/src/main/java/org/apache/hudi/examples/quickstart/utils/QuickstartConfigurations.java new file mode 100644 index 0000000000000..8dfd9df9eb479 --- /dev/null +++ b/hudi-examples/hudi-examples-flink/src/main/java/org/apache/hudi/examples/quickstart/utils/QuickstartConfigurations.java @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.examples.quickstart.utils; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.runtime.typeutils.RowDataSerializer; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.examples.quickstart.factory.CollectSinkTableFactory; +import org.apache.hudi.examples.quickstart.factory.ContinuousFileSourceFactory; +import org.apache.hudi.streamer.FlinkStreamerConfig; + +/** + * Configurations for the test. + */ +public class QuickstartConfigurations { + private QuickstartConfigurations() { + } + + public static final DataType ROW_DATA_TYPE = DataTypes.ROW( + DataTypes.FIELD("uuid", DataTypes.VARCHAR(20)),// record key + DataTypes.FIELD("name", DataTypes.VARCHAR(10)), + DataTypes.FIELD("age", DataTypes.INT()), + DataTypes.FIELD("ts", DataTypes.TIMESTAMP(3)), // precombine field + DataTypes.FIELD("partition", DataTypes.VARCHAR(10))) + .notNull(); + + public static final RowType ROW_TYPE = (RowType) ROW_DATA_TYPE.getLogicalType(); + + public static final ResolvedSchema TABLE_SCHEMA = SchemaBuilder.instance() + .fields(ROW_TYPE.getFieldNames(), ROW_DATA_TYPE.getChildren()) + .build(); + + private static final List FIELDS = ROW_TYPE.getFields().stream() + .map(RowType.RowField::asSummaryString).collect(Collectors.toList()); + + public static final DataType ROW_DATA_TYPE_WIDER = DataTypes.ROW( + DataTypes.FIELD("uuid", DataTypes.VARCHAR(20)),// record key + DataTypes.FIELD("name", DataTypes.VARCHAR(10)), + DataTypes.FIELD("age", DataTypes.INT()), + DataTypes.FIELD("salary", DataTypes.DOUBLE()), + DataTypes.FIELD("ts", DataTypes.TIMESTAMP(3)), // precombine field + DataTypes.FIELD("partition", DataTypes.VARCHAR(10))) + .notNull(); + + public static final RowType ROW_TYPE_WIDER = (RowType) ROW_DATA_TYPE_WIDER.getLogicalType(); + + public static String getCreateHoodieTableDDL(String tableName, Map options) { + return getCreateHoodieTableDDL(tableName, options, true, "partition"); + } + + public static String getCreateHoodieTableDDL( + String tableName, + Map options, + boolean havePartition, + String partitionField) { + return getCreateHoodieTableDDL(tableName, FIELDS, options, havePartition, "uuid", partitionField); + } + + public static String getCreateHoodieTableDDL( + String tableName, + List fields, + Map options, + boolean havePartition, + String pkField, + String partitionField) { + StringBuilder builder = new StringBuilder(); + builder.append("create table ").append(tableName).append("(\n"); + for (String field : fields) { + builder.append(" ").append(field).append(",\n"); + } + builder.append(" PRIMARY KEY(").append(pkField).append(") NOT ENFORCED\n") + .append(")\n"); + if (havePartition) { + builder.append("PARTITIONED BY (`").append(partitionField).append("`)\n"); + } + final String connector = options.computeIfAbsent("connector", k -> "hudi"); + builder.append("with (\n" + + " 'connector' = '").append(connector).append("'"); + options.forEach((k, v) -> builder.append(",\n") + .append(" '").append(k).append("' = '").append(v).append("'")); + builder.append("\n)"); + return builder.toString(); + } + + public static String getCreateHudiCatalogDDL(final String catalogName, final String catalogPath) { + StringBuilder builder = new StringBuilder(); + builder.append("create catalog ").append(catalogName).append(" with (\n"); + builder.append(" 'type' = 'hudi',\n" + + " 'catalog.path' = '").append(catalogPath).append("'"); + builder.append("\n)"); + return builder.toString(); + } + + public static String getFileSourceDDL(String tableName) { + return getFileSourceDDL(tableName, "source-file.json"); + } + + public static String getFileSourceDDL(String tableName, int checkpoints) { + return getFileSourceDDL(tableName, "source-file.json", checkpoints); + } + + public static String getFileSourceDDL(String tableName, String fileName) { + return getFileSourceDDL(tableName, fileName, 2); + } + + public static String getFileSourceDDL(String tableName, String fileName, int checkpoints) { + String sourcePath = Objects.requireNonNull(Thread.currentThread() + .getContextClassLoader().getResource(fileName)).toString(); + return "create table " + tableName + "(\n" + + " uuid varchar(20),\n" + + " name varchar(10),\n" + + " age int,\n" + + " ts timestamp(3),\n" + + " `partition` varchar(20)\n" + + ") with (\n" + + " 'connector' = '" + ContinuousFileSourceFactory.FACTORY_ID + "',\n" + + " 'path' = '" + sourcePath + "',\n" + + " 'checkpoints' = '" + checkpoints + "'\n" + + ")"; + } + + public static String getCollectSinkDDL(String tableName) { + return "create table " + tableName + "(\n" + + " uuid varchar(20),\n" + + " name varchar(10),\n" + + " age int,\n" + + " ts timestamp(3),\n" + + " `partition` varchar(20)\n" + + ") with (\n" + + " 'connector' = '" + CollectSinkTableFactory.FACTORY_ID + "'" + + ")"; + } + + public static String getCollectSinkDDL(String tableName, TableSchema tableSchema) { + final StringBuilder builder = new StringBuilder("create table " + tableName + "(\n"); + String[] fieldNames = tableSchema.getFieldNames(); + DataType[] fieldTypes = tableSchema.getFieldDataTypes(); + for (int i = 0; i < fieldNames.length; i++) { + builder.append(" `") + .append(fieldNames[i]) + .append("` ") + .append(fieldTypes[i].toString()); + if (i != fieldNames.length - 1) { + builder.append(","); + } + builder.append("\n"); + } + final String withProps = "" + + ") with (\n" + + " 'connector' = '" + CollectSinkTableFactory.FACTORY_ID + "'\n" + + ")"; + builder.append(withProps); + return builder.toString(); + } + + public static String getCsvSourceDDL(String tableName, String fileName) { + String sourcePath = Objects.requireNonNull(Thread.currentThread() + .getContextClassLoader().getResource(fileName)).toString(); + return "create table " + tableName + "(\n" + + " uuid varchar(20),\n" + + " name varchar(10),\n" + + " age int,\n" + + " ts timestamp(3),\n" + + " `partition` varchar(20)\n" + + ") with (\n" + + " 'connector' = 'filesystem',\n" + + " 'path' = '" + sourcePath + "',\n" + + " 'format' = 'csv'\n" + + ")"; + } + + public static final RowDataSerializer SERIALIZER = new RowDataSerializer(ROW_TYPE); + + public static Configuration getDefaultConf(String tablePath) { + Configuration conf = new Configuration(); + conf.setString(FlinkOptions.PATH, tablePath); + conf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH, + Objects.requireNonNull(Thread.currentThread() + .getContextClassLoader().getResource("test_read_schema.avsc")).toString()); + conf.setString(FlinkOptions.TABLE_NAME, "TestHoodieTable"); + conf.setString(FlinkOptions.PARTITION_PATH_FIELD, "partition"); + return conf; + } + + public static FlinkStreamerConfig getDefaultStreamerConf(String tablePath) { + FlinkStreamerConfig streamerConf = new FlinkStreamerConfig(); + streamerConf.targetBasePath = tablePath; + streamerConf.sourceAvroSchemaPath = Objects.requireNonNull(Thread.currentThread() + .getContextClassLoader().getResource("test_read_schema.avsc")).toString(); + streamerConf.targetTableName = "TestHoodieTable"; + streamerConf.partitionPathField = "partition"; + streamerConf.tableType = "COPY_ON_WRITE"; + streamerConf.checkpointInterval = 4000L; + return streamerConf; + } + + /** + * Creates the tool to build hoodie table DDL. + */ + public static Sql sql(String tableName) { + return new Sql(tableName); + } + + public static Catalog catalog(String catalogName) { + return new Catalog(catalogName); + } + + // ------------------------------------------------------------------------- + // Utilities + // ------------------------------------------------------------------------- + + /** + * Tool to build hoodie table DDL with schema {@link #TABLE_SCHEMA}. + */ + public static class Sql { + private final Map options; + private final String tableName; + private List fields = new ArrayList<>(); + private boolean withPartition = true; + private String pkField = "uuid"; + private String partitionField = "partition"; + + public Sql(String tableName) { + options = new HashMap<>(); + this.tableName = tableName; + } + + public Sql option(ConfigOption option, Object val) { + this.options.put(option.key(), val.toString()); + return this; + } + + public Sql option(String key, Object val) { + this.options.put(key, val.toString()); + return this; + } + + public Sql options(Map options) { + this.options.putAll(options); + return this; + } + + public Sql noPartition() { + this.withPartition = false; + return this; + } + + public Sql pkField(String pkField) { + this.pkField = pkField; + return this; + } + + public Sql partitionField(String partitionField) { + this.partitionField = partitionField; + return this; + } + + public Sql field(String fieldSchema) { + fields.add(fieldSchema); + return this; + } + + public String end() { + if (this.fields.size() == 0) { + this.fields = FIELDS; + } + return QuickstartConfigurations.getCreateHoodieTableDDL(this.tableName, this.fields, options, + this.withPartition, this.pkField, this.partitionField); + } + } + + public static class Catalog { + private final String catalogName; + private String catalogPath = "."; + + public Catalog(String catalogName) { + this.catalogName = catalogName; + } + + public Catalog catalogPath(String catalogPath) { + this.catalogPath = catalogPath; + return this; + } + + public String end() { + return QuickstartConfigurations.getCreateHudiCatalogDDL(catalogName, catalogPath); + } + } +} diff --git a/hudi-examples/hudi-examples-flink/src/main/java/org/apache/hudi/examples/quickstart/utils/SchemaBuilder.java b/hudi-examples/hudi-examples-flink/src/main/java/org/apache/hudi/examples/quickstart/utils/SchemaBuilder.java new file mode 100644 index 0000000000000..76306f780646d --- /dev/null +++ b/hudi-examples/hudi-examples-flink/src/main/java/org/apache/hudi/examples/quickstart/utils/SchemaBuilder.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.examples.quickstart.utils; + +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.UniqueConstraint; +import org.apache.flink.table.catalog.WatermarkSpec; +import org.apache.flink.table.types.DataType; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Builder for {@link ResolvedSchema}. + */ +public class SchemaBuilder { + private List columns; + private List watermarkSpecs; + private UniqueConstraint constraint; + + public static SchemaBuilder instance() { + return new SchemaBuilder(); + } + + private SchemaBuilder() { + this.columns = new ArrayList<>(); + this.watermarkSpecs = new ArrayList<>(); + } + + public SchemaBuilder field(String name, DataType type) { + this.columns.add(Column.physical(name, type)); + return this; + } + + public SchemaBuilder fields(List names, List types) { + List columns = IntStream.range(0, names.size()) + .mapToObj(idx -> Column.physical(names.get(idx), types.get(idx))) + .collect(Collectors.toList()); + this.columns.addAll(columns); + return this; + } + + public SchemaBuilder primaryKey(String... columns) { + this.constraint = UniqueConstraint.primaryKey("pk", Arrays.asList(columns)); + return this; + } + + public ResolvedSchema build() { + return new ResolvedSchema(columns, watermarkSpecs, constraint); + } +} diff --git a/hudi-examples/hudi-examples-flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/hudi-examples/hudi-examples-flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory new file mode 100644 index 0000000000000..27a137292b388 --- /dev/null +++ b/hudi-examples/hudi-examples-flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -0,0 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.hudi.examples.quickstart.factory.ContinuousFileSourceFactory +org.apache.hudi.examples.quickstart.factory.CollectSinkTableFactory diff --git a/hudi-examples/hudi-examples-flink/src/main/resources/source-file.json b/hudi-examples/hudi-examples-flink/src/main/resources/source-file.json new file mode 100644 index 0000000000000..2f628e29c535b --- /dev/null +++ b/hudi-examples/hudi-examples-flink/src/main/resources/source-file.json @@ -0,0 +1,8 @@ +{"uuid": "id1", "name": "Danny", "age": 23, "ts": "1970-01-01T00:00:01", "partition": "par1"} +{"uuid": "id2", "name": "Stephen", "age": 33, "ts": "1970-01-01T00:00:02", "partition": "par1"} +{"uuid": "id3", "name": "Julian", "age": 53, "ts": "1970-01-01T00:00:03", "partition": "par2"} +{"uuid": "id4", "name": "Fabian", "age": 31, "ts": "1970-01-01T00:00:04", "partition": "par2"} +{"uuid": "id5", "name": "Sophia", "age": 18, "ts": "1970-01-01T00:00:05", "partition": "par3"} +{"uuid": "id6", "name": "Emma", "age": 20, "ts": "1970-01-01T00:00:06", "partition": "par3"} +{"uuid": "id7", "name": "Bob", "age": 44, "ts": "1970-01-01T00:00:07", "partition": "par4"} +{"uuid": "id8", "name": "Han", "age": 56, "ts": "1970-01-01T00:00:08", "partition": "par4"} diff --git a/hudi-examples/hudi-examples-flink/src/test/java/org/apache/hudi/examples/quickstart/TestHoodieFlinkQuickstart.java b/hudi-examples/hudi-examples-flink/src/test/java/org/apache/hudi/examples/quickstart/TestHoodieFlinkQuickstart.java new file mode 100644 index 0000000000000..3ac5f43c48652 --- /dev/null +++ b/hudi-examples/hudi-examples-flink/src/test/java/org/apache/hudi/examples/quickstart/TestHoodieFlinkQuickstart.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.examples.quickstart; + +import static org.apache.hudi.examples.quickstart.TestQuickstartData.assertRowsEquals; +import java.io.File; +import java.util.List; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.test.util.AbstractTestBase; +import org.apache.flink.types.Row; +import org.apache.hudi.common.model.HoodieTableType; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +/** + * IT cases for Hoodie table source and sink. + */ +public class TestHoodieFlinkQuickstart extends AbstractTestBase { + private final HoodieFlinkQuickstart flinkQuickstart = HoodieFlinkQuickstart.instance(); + + @BeforeEach + void beforeEach() { + flinkQuickstart.initEnv(); + } + + @TempDir + File tempFile; + + @ParameterizedTest + @EnumSource(value = HoodieTableType.class) + void testHoodieFlinkQuickstart(HoodieTableType tableType) throws Exception { + // create filesystem table named source + flinkQuickstart.createFileSource(); + + // create hudi table + flinkQuickstart.createHudiTable(tempFile.getAbsolutePath(), "t1", tableType); + + // insert data + List rows = flinkQuickstart.insertData(); + assertRowsEquals(rows, TestQuickstartData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT); + + // query data + List rows1 = flinkQuickstart.queryData(); + assertRowsEquals(rows1, TestQuickstartData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT); + + // update data + List rows2 = flinkQuickstart.updateData(); + assertRowsEquals(rows2, TestQuickstartData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT); + } +} diff --git a/hudi-examples/hudi-examples-flink/src/test/java/org/apache/hudi/examples/quickstart/TestQuickstartData.java b/hudi-examples/hudi-examples-flink/src/test/java/org/apache/hudi/examples/quickstart/TestQuickstartData.java new file mode 100644 index 0000000000000..b0d7e79814349 --- /dev/null +++ b/hudi-examples/hudi-examples-flink/src/test/java/org/apache/hudi/examples/quickstart/TestQuickstartData.java @@ -0,0 +1,422 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.examples.quickstart; + +import static junit.framework.TestCase.assertEquals; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import java.io.File; +import java.io.FileFilter; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.data.binary.BinaryRowData; +import org.apache.flink.table.data.conversion.DataStructureConverter; +import org.apache.flink.table.data.conversion.DataStructureConverters; +import org.apache.flink.table.data.writer.BinaryRowWriter; +import org.apache.flink.table.data.writer.BinaryWriter; +import org.apache.flink.table.runtime.typeutils.InternalSerializers; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.common.config.HoodieCommonConfig; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; +import org.apache.hudi.examples.quickstart.utils.QuickstartConfigurations; +import org.apache.parquet.Strings; +import org.apache.parquet.avro.AvroParquetReader; +import org.apache.parquet.hadoop.ParquetReader; + +/** + * Data set for testing, also some utilities to check the results. + */ +public class TestQuickstartData { + + public static List DATA_SET_INSERT_DUPLICATES = new ArrayList<>(); + + static { + IntStream.range(0, 5).forEach(i -> DATA_SET_INSERT_DUPLICATES.add( + insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23, + TimestampData.fromEpochMillis(1), StringData.fromString("par1")))); + } + + public static List DATA_SET_INSERT_SAME_KEY = new ArrayList<>(); + + static { + IntStream.range(0, 5).forEach(i -> DATA_SET_INSERT_SAME_KEY.add( + insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23, + TimestampData.fromEpochMillis(i), StringData.fromString("par1")))); + } + + // data set of source-file.json latest commit. + public static List DATA_SET_SOURCE_INSERT_LATEST_COMMIT = Arrays.asList( + insertRow(StringData.fromString("id5"), StringData.fromString("Sophia"), 18, + TimestampData.fromEpochMillis(5000), StringData.fromString("par3")), + insertRow(StringData.fromString("id6"), StringData.fromString("Emma"), 20, + TimestampData.fromEpochMillis(6000), StringData.fromString("par3")), + insertRow(StringData.fromString("id7"), StringData.fromString("Bob"), 44, + TimestampData.fromEpochMillis(7000), StringData.fromString("par4")), + insertRow(StringData.fromString("id8"), StringData.fromString("Han"), 56, + TimestampData.fromEpochMillis(8000), StringData.fromString("par4")) + ); + + public static List DATA_SET_DISORDER_UPDATE_DELETE = Arrays.asList( + // DISORDER UPDATE + updateAfterRow(StringData.fromString("id1"), StringData.fromString("Danny"), 21, + TimestampData.fromEpochMillis(3), StringData.fromString("par1")), + updateAfterRow(StringData.fromString("id1"), StringData.fromString("Danny"), 20, + TimestampData.fromEpochMillis(2), StringData.fromString("par1")), + updateBeforeRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23, + TimestampData.fromEpochMillis(1), StringData.fromString("par1")), + updateBeforeRow(StringData.fromString("id1"), StringData.fromString("Danny"), 20, + TimestampData.fromEpochMillis(2), StringData.fromString("par1")), + updateAfterRow(StringData.fromString("id1"), StringData.fromString("Danny"), 22, + TimestampData.fromEpochMillis(4), StringData.fromString("par1")), + updateBeforeRow(StringData.fromString("id1"), StringData.fromString("Danny"), 21, + TimestampData.fromEpochMillis(3), StringData.fromString("par1")), + // DISORDER DELETE + deleteRow(StringData.fromString("id1"), StringData.fromString("Danny"), 22, + TimestampData.fromEpochMillis(2), StringData.fromString("par1")) + ); + + public static List dataSetInsert(int... ids) { + List inserts = new ArrayList<>(); + Arrays.stream(ids).forEach(i -> inserts.add( + insertRow(StringData.fromString("id" + i), StringData.fromString("Danny"), 23, + TimestampData.fromEpochMillis(i), StringData.fromString("par1")))); + return inserts; + } + + private static Integer toIdSafely(Object id) { + if (id == null) { + return -1; + } + final String idStr = id.toString(); + if (idStr.startsWith("id")) { + return Integer.parseInt(idStr.substring(2)); + } + return -1; + } + + /** + * Returns string format of a list of RowData. + */ + public static String rowDataToString(List rows) { + DataStructureConverter converter = + DataStructureConverters.getConverter(QuickstartConfigurations.ROW_DATA_TYPE); + return rows.stream() + .sorted(Comparator.comparing(o -> toIdSafely(o.getString(0)))) + .map(row -> converter.toExternal(row).toString()) + .collect(Collectors.toList()).toString(); + } + + private static String toStringSafely(Object obj) { + return obj == null ? "null" : obj.toString(); + } + + /** + * Sort the {@code rows} using field at index 0 and asserts + * it equals with the expected string {@code expected}. + * + * @param rows Actual result rows + * @param expected Expected string of the sorted rows + */ + public static void assertRowsEquals(List rows, String expected) { + assertRowsEquals(rows, expected, false); + } + + /** + * Sort the {@code rows} using field at index 0 and asserts + * it equals with the expected string {@code expected}. + * + * @param rows Actual result rows + * @param expected Expected string of the sorted rows + * @param withChangeFlag Whether compares with change flags + */ + public static void assertRowsEquals(List rows, String expected, boolean withChangeFlag) { + String rowsString = rows.stream() + .sorted(Comparator.comparing(o -> toStringSafely(o.getField(0)))) + .map(row -> { + final String rowStr = row.toString(); + if (withChangeFlag) { + return row.getKind().shortString() + "(" + rowStr + ")"; + } else { + return rowStr; + } + }) + .collect(Collectors.toList()).toString(); + assertThat(rowsString, is(expected)); + } + + /** + * Sort the {@code rows} using field at index {@code orderingPos} and asserts + * it equals with the expected string {@code expected}. + * + * @param rows Actual result rows + * @param expected Expected string of the sorted rows + * @param orderingPos Field position for ordering + */ + public static void assertRowsEquals(List rows, String expected, int orderingPos) { + String rowsString = rows.stream() + .sorted(Comparator.comparing(o -> toStringSafely(o.getField(orderingPos)))) + .collect(Collectors.toList()).toString(); + assertThat(rowsString, is(expected)); + } + + /** + * Sort the {@code rows} using field at index 0 and asserts + * it equals with the expected row data list {@code expected}. + * + * @param rows Actual result rows + * @param expected Expected row data list + */ + public static void assertRowsEquals(List rows, List expected) { + String rowsString = rows.stream() + .sorted(Comparator.comparing(o -> toIdSafely(o.getField(0)))) + .collect(Collectors.toList()).toString(); + assertThat(rowsString, is(rowDataToString(expected))); + } + + /** + * Sort the {@code rows} using field at index 0 and asserts + * it equals with the expected string {@code expected}. + * + * @param rows Actual result rows + * @param expected Expected string of the sorted rows + */ + public static void assertRowDataEquals(List rows, String expected) { + String rowsString = rowDataToString(rows); + assertThat(rowsString, is(expected)); + } + + /** + * Sort the {@code rows} using field at index 0 and asserts + * it equals with the expected row data list {@code expected}. + * + * @param rows Actual result rows + * @param expected Expected row data list + */ + public static void assertRowDataEquals(List rows, List expected) { + String rowsString = rowDataToString(rows); + assertThat(rowsString, is(rowDataToString(expected))); + } + + /** + * Checks the source data set are written as expected. + * + *

Note: Replace it with the Flink reader when it is supported. + * + * @param baseFile The file base to check, should be a directory + * @param expected The expected results mapping, the key should be the partition path + * and value should be values list with the key partition + */ + public static void checkWrittenData(File baseFile, Map expected) throws IOException { + checkWrittenData(baseFile, expected, 4); + } + + /** + * Checks the source data set are written as expected. + * + *

Note: Replace it with the Flink reader when it is supported. + * + * @param baseFile The file base to check, should be a directory + * @param expected The expected results mapping, the key should be the partition path + * and value should be values list with the key partition + * @param partitions The expected partition number + */ + public static void checkWrittenData( + File baseFile, + Map expected, + int partitions) throws IOException { + assert baseFile.isDirectory(); + FileFilter filter = file -> !file.getName().startsWith("."); + File[] partitionDirs = baseFile.listFiles(filter); + assertNotNull(partitionDirs); + assertThat(partitionDirs.length, is(partitions)); + for (File partitionDir : partitionDirs) { + File[] dataFiles = partitionDir.listFiles(filter); + assertNotNull(dataFiles); + File latestDataFile = Arrays.stream(dataFiles) + .max(Comparator.comparing(f -> FSUtils.getCommitTime(f.getName()))) + .orElse(dataFiles[0]); + ParquetReader reader = AvroParquetReader + .builder(new Path(latestDataFile.getAbsolutePath())).build(); + List readBuffer = new ArrayList<>(); + GenericRecord nextRecord = reader.read(); + while (nextRecord != null) { + readBuffer.add(filterOutVariables(nextRecord)); + nextRecord = reader.read(); + } + readBuffer.sort(Comparator.naturalOrder()); + assertThat(readBuffer.toString(), is(expected.get(partitionDir.getName()))); + } + } + + /** + * Checks the MERGE_ON_READ source data are written as expected. + * + *

Note: Replace it with the Flink reader when it is supported. + * + * @param fs The file system + * @param latestInstant The latest committed instant of current table + * @param baseFile The file base to check, should be a directory + * @param expected The expected results mapping, the key should be the partition path + * @param partitions The expected partition number + * @param schema The read schema + */ + public static void checkWrittenDataMOR( + FileSystem fs, + String latestInstant, + File baseFile, + Map expected, + int partitions, + Schema schema) { + assert baseFile.isDirectory() : "Base path should be a directory"; + FileFilter partitionFilter = file -> !file.getName().startsWith("."); + File[] partitionDirs = baseFile.listFiles(partitionFilter); + assertNotNull(partitionDirs); + assertThat(partitionDirs.length, is(partitions)); + for (File partitionDir : partitionDirs) { + File[] dataFiles = partitionDir.listFiles(file -> + file.getName().contains(".log.") && !file.getName().startsWith("..")); + assertNotNull(dataFiles); + HoodieMergedLogRecordScanner scanner = getScanner( + fs, baseFile.getPath(), Arrays.stream(dataFiles).map(File::getAbsolutePath) + .sorted(Comparator.naturalOrder()).collect(Collectors.toList()), + schema, latestInstant); + List readBuffer = scanner.getRecords().values().stream() + .map(hoodieRecord -> { + try { + // in case it is a delete + GenericRecord record = (GenericRecord) hoodieRecord.getData() + .getInsertValue(schema, new Properties()) + .orElse(null); + return record == null ? (String) null : filterOutVariables(record); + } catch (IOException e) { + throw new RuntimeException(e); + } + }) + .filter(Objects::nonNull) + .sorted(Comparator.naturalOrder()) + .collect(Collectors.toList()); + assertThat(readBuffer.toString(), is(expected.get(partitionDir.getName()))); + } + } + + /** + * Returns the scanner to read avro log files. + */ + private static HoodieMergedLogRecordScanner getScanner( + FileSystem fs, + String basePath, + List logPaths, + Schema readSchema, + String instant) { + return HoodieMergedLogRecordScanner.newBuilder() + .withFileSystem(fs) + .withBasePath(basePath) + .withLogFilePaths(logPaths) + .withReaderSchema(readSchema) + .withLatestInstantTime(instant) + .withReadBlocksLazily(false) + .withReverseReader(false) + .withBufferSize(16 * 1024 * 1024) + .withMaxMemorySizeInBytes(1024 * 1024L) + .withSpillableMapBasePath("/tmp/") + .withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue()) + .withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()) + .build(); + } + + /** + * Filter out the variables like file name. + */ + private static String filterOutVariables(GenericRecord genericRecord) { + List fields = new ArrayList<>(); + fields.add(genericRecord.get("_hoodie_record_key").toString()); + fields.add(genericRecord.get("_hoodie_partition_path").toString()); + fields.add(genericRecord.get("uuid").toString()); + fields.add(genericRecord.get("name").toString()); + fields.add(genericRecord.get("age").toString()); + fields.add(genericRecord.get("ts").toString()); + fields.add(genericRecord.get("partition").toString()); + return Strings.join(fields, ","); + } + + public static BinaryRowData insertRow(Object... fields) { + return insertRow(QuickstartConfigurations.ROW_TYPE, fields); + } + + public static BinaryRowData insertRow(RowType rowType, Object... fields) { + LogicalType[] types = rowType.getFields().stream().map(RowType.RowField::getType) + .toArray(LogicalType[]::new); + assertEquals( + "Filed count inconsistent with type information", + fields.length, + types.length); + BinaryRowData row = new BinaryRowData(fields.length); + BinaryRowWriter writer = new BinaryRowWriter(row); + writer.reset(); + for (int i = 0; i < fields.length; i++) { + Object field = fields[i]; + if (field == null) { + writer.setNullAt(i); + } else { + BinaryWriter.write(writer, i, field, types[i], InternalSerializers.create(types[i])); + } + } + writer.complete(); + return row; + } + + private static BinaryRowData deleteRow(Object... fields) { + BinaryRowData rowData = insertRow(fields); + rowData.setRowKind(RowKind.DELETE); + return rowData; + } + + private static BinaryRowData updateBeforeRow(Object... fields) { + BinaryRowData rowData = insertRow(fields); + rowData.setRowKind(RowKind.UPDATE_BEFORE); + return rowData; + } + + private static BinaryRowData updateAfterRow(Object... fields) { + BinaryRowData rowData = insertRow(fields); + rowData.setRowKind(RowKind.UPDATE_AFTER); + return rowData; + } +} diff --git a/hudi-examples/hudi-examples-flink/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory b/hudi-examples/hudi-examples-flink/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory new file mode 100644 index 0000000000000..27a137292b388 --- /dev/null +++ b/hudi-examples/hudi-examples-flink/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -0,0 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.hudi.examples.quickstart.factory.ContinuousFileSourceFactory +org.apache.hudi.examples.quickstart.factory.CollectSinkTableFactory diff --git a/hudi-examples/hudi-examples-flink/src/test/resources/log4j-surefire-quiet.properties b/hudi-examples/hudi-examples-flink/src/test/resources/log4j-surefire-quiet.properties new file mode 100644 index 0000000000000..2b94ea2903067 --- /dev/null +++ b/hudi-examples/hudi-examples-flink/src/test/resources/log4j-surefire-quiet.properties @@ -0,0 +1,30 @@ +### +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +### +log4j.rootLogger=WARN, CONSOLE +log4j.logger.org.apache.hudi=DEBUG +log4j.logger.org.apache.hadoop.hbase=ERROR + +# CONSOLE is set to be a ConsoleAppender. +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender +# CONSOLE uses PatternLayout. +log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout +log4j.appender.CONSOLE.layout.ConversionPattern=[%-5p] %d %c %x - %m%n +log4j.appender.CONSOLE.filter.a=org.apache.log4j.varia.LevelRangeFilter +log4j.appender.CONSOLE.filter.a.AcceptOnMatch=true +log4j.appender.CONSOLE.filter.a.LevelMin=WARN +log4j.appender.CONSOLE.filter.a.LevelMax=FATAL \ No newline at end of file diff --git a/hudi-examples/hudi-examples-flink/src/test/resources/log4j-surefire.properties b/hudi-examples/hudi-examples-flink/src/test/resources/log4j-surefire.properties new file mode 100644 index 0000000000000..8dcd17f303f6b --- /dev/null +++ b/hudi-examples/hudi-examples-flink/src/test/resources/log4j-surefire.properties @@ -0,0 +1,31 @@ +### +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +### +log4j.rootLogger=INFO, CONSOLE +log4j.logger.org.apache=INFO +log4j.logger.org.apache.hudi=DEBUG +log4j.logger.org.apache.hadoop.hbase=ERROR + +# A1 is set to be a ConsoleAppender. +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender +# A1 uses PatternLayout. +log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout +log4j.appender.CONSOLE.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n +log4j.appender.CONSOLE.filter.a=org.apache.log4j.varia.LevelRangeFilter +log4j.appender.CONSOLE.filter.a.AcceptOnMatch=true +log4j.appender.CONSOLE.filter.a.LevelMin=INFO +log4j.appender.CONSOLE.filter.a.LevelMax=FATAL diff --git a/hudi-examples/hudi-examples-java/pom.xml b/hudi-examples/hudi-examples-java/pom.xml new file mode 100644 index 0000000000000..58ec91edfd6ec --- /dev/null +++ b/hudi-examples/hudi-examples-java/pom.xml @@ -0,0 +1,129 @@ + + + + + hudi-examples + org.apache.hudi + 0.11.0-SNAPSHOT + + 4.0.0 + + hudi-examples-java + + + ${project.parent.basedir} + true + + + + + + src/main/resources + + + + + + org.apache.maven.plugins + maven-dependency-plugin + + + copy-dependencies + prepare-package + + copy-dependencies + + + ${project.build.directory}/lib + true + true + true + + + + + + net.alchim31.maven + scala-maven-plugin + + + scala-compile-first + process-resources + + add-source + compile + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + + compile + + compile + + + + + + org.apache.maven.plugins + maven-jar-plugin + + + + test-jar + + test-compile + + + + false + + + + org.apache.rat + apache-rat-plugin + + + + + + + org.apache.hudi + hudi-examples-common + ${project.version} + + + + org.apache.hudi + hudi-client-common + ${project.version} + + + + org.apache.hudi + hudi-java-client + ${project.version} + + + diff --git a/hudi-examples/src/main/java/org/apache/hudi/examples/java/HoodieJavaWriteClientExample.java b/hudi-examples/hudi-examples-java/src/main/java/org/apache/hudi/examples/java/HoodieJavaWriteClientExample.java similarity index 100% rename from hudi-examples/src/main/java/org/apache/hudi/examples/java/HoodieJavaWriteClientExample.java rename to hudi-examples/hudi-examples-java/src/main/java/org/apache/hudi/examples/java/HoodieJavaWriteClientExample.java diff --git a/hudi-examples/hudi-examples-spark/pom.xml b/hudi-examples/hudi-examples-spark/pom.xml new file mode 100644 index 0000000000000..f29ee7b24d007 --- /dev/null +++ b/hudi-examples/hudi-examples-spark/pom.xml @@ -0,0 +1,283 @@ + + + + + hudi-examples + org.apache.hudi + 0.11.0-SNAPSHOT + + 4.0.0 + + hudi-examples-spark + + + ${project.parent.basedir} + true + + + + + + src/main/resources + + + + + + org.apache.maven.plugins + maven-dependency-plugin + + + copy-dependencies + prepare-package + + copy-dependencies + + + ${project.build.directory}/lib + true + true + true + + + + + + net.alchim31.maven + scala-maven-plugin + + + scala-compile-first + process-resources + + add-source + compile + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + + compile + + compile + + + + + + org.apache.maven.plugins + maven-jar-plugin + + + + test-jar + + test-compile + + + + false + + + + org.apache.rat + apache-rat-plugin + + + + + + + org.scala-lang + scala-library + ${scala.version} + + + + org.apache.hudi + hudi-examples-common + ${project.version} + + + * + * + + + + + + org.apache.hudi + hudi-cli + ${project.version} + + + + org.apache.hudi + hudi-client-common + ${project.version} + + + + org.apache.hudi + hudi-java-client + ${project.version} + + + + org.apache.hudi + hudi-spark-client + ${project.version} + + + + org.apache.hudi + hudi-utilities_${scala.binary.version} + ${project.version} + + + + org.apache.hudi + hudi-spark_${scala.binary.version} + ${project.version} + + + + org.apache.hudi + hudi-spark-common_${scala.binary.version} + ${project.version} + + + + org.apache.hudi + hudi-hadoop-mr + ${project.version} + + + + org.apache.hudi + hudi-timeline-service + ${project.version} + + + + + org.apache.spark + spark-core_${scala.binary.version} + + + org.apache.spark + spark-sql_${scala.binary.version} + + + org.apache.spark + spark-avro_${scala.binary.version} + + + + + org.apache.parquet + parquet-hadoop + ${parquet.version} + + + + + org.apache.avro + avro + + + + org.apache.parquet + parquet-avro + + + + + ${hive.groupid} + hive-common + + + ${hive.groupid} + hive-exec + ${hive.version} + provided + ${hive.exec.classifier} + + + javax.mail + mail + + + org.eclipse.jetty.aggregate + * + + + + + + + org.junit.jupiter + junit-jupiter-api + test + + + org.junit.jupiter + junit-jupiter-engine + test + + + org.junit.vintage + junit-vintage-engine + test + + + org.junit.jupiter + junit-jupiter-params + test + + + org.mockito + mockito-junit-jupiter + test + + + + + org.apache.hudi + hudi-client-common + ${project.version} + tests + test-jar + test + + + org.apache.hudi + hudi-spark-client + ${project.version} + tests + test-jar + test + + + diff --git a/hudi-examples/src/main/java/org/apache/hudi/examples/common/ExampleDataSchemaProvider.java b/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/common/ExampleDataSchemaProvider.java similarity index 99% rename from hudi-examples/src/main/java/org/apache/hudi/examples/common/ExampleDataSchemaProvider.java rename to hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/common/ExampleDataSchemaProvider.java index 4486a4286c43f..c974d9ad73313 100644 --- a/hudi-examples/src/main/java/org/apache/hudi/examples/common/ExampleDataSchemaProvider.java +++ b/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/common/ExampleDataSchemaProvider.java @@ -23,7 +23,6 @@ import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.spark.api.java.JavaSparkContext; - /** * the example SchemaProvider of example json data from uber. */ diff --git a/hudi-examples/src/main/java/org/apache/hudi/examples/common/HoodieExampleSparkUtils.java b/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/common/HoodieExampleSparkUtils.java similarity index 100% rename from hudi-examples/src/main/java/org/apache/hudi/examples/common/HoodieExampleSparkUtils.java rename to hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/common/HoodieExampleSparkUtils.java diff --git a/hudi-examples/src/main/java/org/apache/hudi/examples/common/IdentityTransformer.java b/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/common/IdentityTransformer.java similarity index 100% rename from hudi-examples/src/main/java/org/apache/hudi/examples/common/IdentityTransformer.java rename to hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/common/IdentityTransformer.java diff --git a/hudi-examples/src/main/java/org/apache/hudi/examples/common/RandomJsonSource.java b/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/common/RandomJsonSource.java similarity index 100% rename from hudi-examples/src/main/java/org/apache/hudi/examples/common/RandomJsonSource.java rename to hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/common/RandomJsonSource.java diff --git a/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/HoodieSparkQuickstart.java b/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/HoodieSparkQuickstart.java new file mode 100644 index 0000000000000..1e6aa59c5a829 --- /dev/null +++ b/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/HoodieSparkQuickstart.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.examples.quickstart; + +import static org.apache.hudi.config.HoodieWriteConfig.TBL_NAME; +import static org.apache.spark.sql.SaveMode.Append; +import static org.apache.spark.sql.SaveMode.Overwrite; +import java.util.List; +import org.apache.commons.lang.ArrayUtils; +import org.apache.hudi.common.model.HoodieAvroPayload; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.examples.common.HoodieExampleDataGenerator; +import org.apache.hudi.examples.common.HoodieExampleSparkUtils; +import org.apache.hudi.keygen.constant.KeyGeneratorOptions; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; + +public final class HoodieSparkQuickstart { + + private HoodieSparkQuickstart() { + } + + public static void main(String[] args) { + if (args.length < 2) { + System.err.println("Usage: HoodieWriteClientExample "); + System.exit(1); + } + String tablePath = args[0]; + String tableName = args[1]; + + SparkSession spark = HoodieExampleSparkUtils.defaultSparkSession("Hudi Spark basic example"); + SparkConf sparkConf = HoodieExampleSparkUtils.defaultSparkConf("hoodie-client-example"); + + try (JavaSparkContext jsc = new JavaSparkContext(sparkConf)) { + final HoodieExampleDataGenerator dataGen = new HoodieExampleDataGenerator<>(); + + insertData(spark, jsc, tablePath, tableName, dataGen); + updateData(spark, jsc, tablePath, tableName, dataGen); + queryData(spark, jsc, tablePath, tableName, dataGen); + + incrementalQuery(spark, tablePath, tableName); + pointInTimeQuery(spark, tablePath, tableName); + + delete(spark, tablePath, tableName); + deleteByPartition(spark, tablePath, tableName); + } + } + + /** + * Generate some new trips, load them into a DataFrame and write the DataFrame into the Hudi dataset as below. + */ + public static void insertData(SparkSession spark, JavaSparkContext jsc, String tablePath, String tableName, + HoodieExampleDataGenerator dataGen) { + String commitTime = Long.toString(System.currentTimeMillis()); + List inserts = dataGen.convertToStringList(dataGen.generateInserts(commitTime, 20)); + Dataset df = spark.read().json(jsc.parallelize(inserts, 1)); + df.write().format("org.apache.hudi"). + options(QuickstartUtils.getQuickstartWriteConfigs()). + option(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), "ts"). + option(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "uuid"). + option(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "partitionpath"). + option(TBL_NAME.key(), tableName). + mode(Overwrite). + save(tablePath); + } + + /** + * Load the data files into a DataFrame. + */ + public static void queryData(SparkSession spark, JavaSparkContext jsc, String tablePath, String tableName, + HoodieExampleDataGenerator dataGen) { + Dataset roViewDF = spark. + read(). + format("org.apache.hudi"). + load(tablePath + "/*/*/*/*"); + + roViewDF.createOrReplaceTempView("hudi_ro_table"); + + spark.sql("select fare, begin_lon, begin_lat, ts from hudi_ro_table where fare > 20.0").show(); + // +-----------------+-------------------+-------------------+---+ + // | fare| begin_lon| begin_lat| ts| + // +-----------------+-------------------+-------------------+---+ + // |98.88075495133515|0.39556048623031603|0.17851135255091155|0.0| + // ... + + spark.sql( + "select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from hudi_ro_table") + .show(); + // +-------------------+--------------------+----------------------+-------------------+--------------------+------------------+ + // |_hoodie_commit_time| _hoodie_record_key|_hoodie_partition_path| rider| driver| fare| + // +-------------------+--------------------+----------------------+-------------------+--------------------+------------------+ + // | 20191231181501|31cafb9f-0196-4b1...| 2020/01/02|rider-1577787297889|driver-1577787297889| 98.88075495133515| + // ... + } + + /** + * This is similar to inserting new data. Generate updates to existing trips using the data generator, + * load into a DataFrame and write DataFrame into the hudi dataset. + */ + public static void updateData(SparkSession spark, JavaSparkContext jsc, String tablePath, String tableName, + HoodieExampleDataGenerator dataGen) { + + String commitTime = Long.toString(System.currentTimeMillis()); + List updates = dataGen.convertToStringList(dataGen.generateUpdates(commitTime, 10)); + Dataset df = spark.read().json(jsc.parallelize(updates, 1)); + df.write().format("org.apache.hudi"). + options(QuickstartUtils.getQuickstartWriteConfigs()). + option(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), "ts"). + option(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "uuid"). + option(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "partitionpath"). + option(TBL_NAME.key(), tableName). + mode(Append). + save(tablePath); + } + + /** + * Deleta data based in data information. + */ + public static void delete(SparkSession spark, String tablePath, String tableName) { + + Dataset roViewDF = spark.read().format("org.apache.hudi").load(tablePath + "/*/*/*/*"); + roViewDF.createOrReplaceTempView("hudi_ro_table"); + Dataset df = spark.sql("select uuid, partitionpath, ts from hudi_ro_table limit 2"); + + df.write().format("org.apache.hudi"). + options(QuickstartUtils.getQuickstartWriteConfigs()). + option(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), "ts"). + option(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "uuid"). + option(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "partitionpath"). + option(TBL_NAME.key(), tableName). + option("hoodie.datasource.write.operation", WriteOperationType.DELETE.value()). + mode(Append). + save(tablePath); + } + + /** + * Delete the data of a single or multiple partitions. + */ + public static void deleteByPartition(SparkSession spark, String tablePath, String tableName) { + Dataset df = spark.emptyDataFrame(); + df.write().format("org.apache.hudi"). + options(QuickstartUtils.getQuickstartWriteConfigs()). + option(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), "ts"). + option(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "uuid"). + option(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "partitionpath"). + option(TBL_NAME.key(), tableName). + option("hoodie.datasource.write.operation", WriteOperationType.DELETE.value()). + option("hoodie.datasource.write.partitions.to.delete", + ArrayUtils.toString(HoodieExampleDataGenerator.DEFAULT_PARTITION_PATHS, ",")). + mode(Append). + save(tablePath); + } + + /** + * Hudi also provides capability to obtain a stream of records that changed since given commit timestamp. + * This can be achieved using Hudi’s incremental view and providing a begin time from which changes need to be streamed. + * We do not need to specify endTime, if we want all changes after the given commit (as is the common case). + */ + public static void incrementalQuery(SparkSession spark, String tablePath, String tableName) { + List commits = + spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_ro_table order by commitTime") + .toJavaRDD() + .map((Function) row -> row.getString(0)) + .take(50); + + String beginTime = commits.get(commits.size() - 2); // commit time we are interested in + + // incrementally query data + Dataset incViewDF = spark. + read(). + format("org.apache.hudi"). + option("hoodie.datasource.query.type", "incremental"). + option("hoodie.datasource.read.begin.instanttime", beginTime). + load(tablePath); + + incViewDF.createOrReplaceTempView("hudi_incr_table"); + spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_incr_table where fare > 20.0") + .show(); + } + + /** + * Lets look at how to query data as of a specific time. + * The specific time can be represented by pointing endTime to a specific commit time + * and beginTime to “000” (denoting earliest possible commit time). + */ + public static void pointInTimeQuery(SparkSession spark, String tablePath, String tableName) { + List commits = + spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_ro_table order by commitTime") + .toJavaRDD() + .map((Function) row -> row.getString(0)) + .take(50); + String beginTime = "000"; // Represents all commits > this time. + String endTime = commits.get(commits.size() - 2); // commit time we are interested in + + //incrementally query data + Dataset incViewDF = spark.read().format("org.apache.hudi"). + option("hoodie.datasource.query.type", "incremental"). + option("hoodie.datasource.read.begin.instanttime", beginTime). + option("hoodie.datasource.read.end.instanttime", endTime). + load(tablePath); + + incViewDF.createOrReplaceTempView("hudi_incr_table"); + spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_incr_table where fare > 20.0") + .show(); + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java b/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/QuickstartUtils.java similarity index 99% rename from hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java rename to hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/QuickstartUtils.java index 9aa7ac1a664cd..2e7084bea36a2 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java +++ b/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/QuickstartUtils.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hudi; +package org.apache.hudi.examples.quickstart; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.model.HoodieAvroRecord; @@ -246,4 +246,4 @@ public static Map getQuickstartWriteConfigs() { demoConfigs.put("hoodie.delete.shuffle.parallelism", "2"); return demoConfigs; } -} \ No newline at end of file +} diff --git a/hudi-examples/src/main/java/org/apache/hudi/examples/spark/HoodieSparkBootstrapExample.java b/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/spark/HoodieSparkBootstrapExample.java similarity index 100% rename from hudi-examples/src/main/java/org/apache/hudi/examples/spark/HoodieSparkBootstrapExample.java rename to hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/spark/HoodieSparkBootstrapExample.java diff --git a/hudi-examples/src/main/java/org/apache/hudi/examples/spark/HoodieWriteClientExample.java b/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/spark/HoodieWriteClientExample.java similarity index 100% rename from hudi-examples/src/main/java/org/apache/hudi/examples/spark/HoodieWriteClientExample.java rename to hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/spark/HoodieWriteClientExample.java diff --git a/hudi-examples/src/main/resources/delta-streamer-config/dfs/source-file.json b/hudi-examples/hudi-examples-spark/src/main/resources/delta-streamer-config/dfs/source-file.json similarity index 100% rename from hudi-examples/src/main/resources/delta-streamer-config/dfs/source-file.json rename to hudi-examples/hudi-examples-spark/src/main/resources/delta-streamer-config/dfs/source-file.json diff --git a/hudi-examples/src/main/resources/delta-streamer-config/kafka/kafka-source.properties b/hudi-examples/hudi-examples-spark/src/main/resources/delta-streamer-config/kafka/kafka-source.properties similarity index 100% rename from hudi-examples/src/main/resources/delta-streamer-config/kafka/kafka-source.properties rename to hudi-examples/hudi-examples-spark/src/main/resources/delta-streamer-config/kafka/kafka-source.properties diff --git a/hudi-examples/src/main/scala/org/apache/hudi/examples/spark/HoodieDataSourceExample.scala b/hudi-examples/hudi-examples-spark/src/main/scala/org/apache/hudi/examples/spark/HoodieDataSourceExample.scala similarity index 98% rename from hudi-examples/src/main/scala/org/apache/hudi/examples/spark/HoodieDataSourceExample.scala rename to hudi-examples/hudi-examples-spark/src/main/scala/org/apache/hudi/examples/spark/HoodieDataSourceExample.scala index 77b3885e3cf7a..7de770d709402 100644 --- a/hudi-examples/src/main/scala/org/apache/hudi/examples/spark/HoodieDataSourceExample.scala +++ b/hudi-examples/hudi-examples-spark/src/main/scala/org/apache/hudi/examples/spark/HoodieDataSourceExample.scala @@ -20,7 +20,7 @@ package org.apache.hudi.examples.spark import org.apache.hudi.DataSourceReadOptions.{BEGIN_INSTANTTIME, END_INSTANTTIME, QUERY_TYPE, QUERY_TYPE_INCREMENTAL_OPT_VAL} import org.apache.hudi.DataSourceWriteOptions.{PARTITIONPATH_FIELD, PRECOMBINE_FIELD, RECORDKEY_FIELD, PARTITIONS_TO_DELETE, OPERATION, DELETE_PARTITION_OPERATION_OPT_VAL, DELETE_OPERATION_OPT_VAL} -import org.apache.hudi.QuickstartUtils.getQuickstartWriteConfigs +import org.apache.hudi.examples.quickstart.QuickstartUtils.getQuickstartWriteConfigs import org.apache.hudi.common.model.HoodieAvroPayload import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME import org.apache.hudi.examples.common.{HoodieExampleDataGenerator, HoodieExampleSparkUtils} @@ -172,7 +172,7 @@ object HoodieDataSourceExample { * This can be achieved using Hudi’s incremental view and providing a begin time from which changes need to be streamed. * We do not need to specify endTime, if we want all changes after the given commit (as is the common case). */ - def incrementalQuery(spark: SparkSession, tablePath: String, tableName: String) { + def incrementalQuery(spark: SparkSession, tablePath: String, tableName: String): Unit = { import spark.implicits._ val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_ro_table order by commitTime").map(k => k.getString(0)).take(50) val beginTime = commits(commits.length - 2) // commit time we are interested in @@ -193,7 +193,7 @@ object HoodieDataSourceExample { * The specific time can be represented by pointing endTime to a specific commit time * and beginTime to “000” (denoting earliest possible commit time). */ - def pointInTimeQuery(spark: SparkSession, tablePath: String, tableName: String) { + def pointInTimeQuery(spark: SparkSession, tablePath: String, tableName: String): Unit = { import spark.implicits._ val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_ro_table order by commitTime").map(k => k.getString(0)).take(50) val beginTime = "000" // Represents all commits > this time. diff --git a/hudi-examples/src/main/scala/org/apache/hudi/examples/spark/HoodieMorCompactionJob.scala b/hudi-examples/hudi-examples-spark/src/main/scala/org/apache/hudi/examples/spark/HoodieMorCompactionJob.scala similarity index 98% rename from hudi-examples/src/main/scala/org/apache/hudi/examples/spark/HoodieMorCompactionJob.scala rename to hudi-examples/hudi-examples-spark/src/main/scala/org/apache/hudi/examples/spark/HoodieMorCompactionJob.scala index 2d96f9c6c39dd..980a29acc05b4 100644 --- a/hudi-examples/src/main/scala/org/apache/hudi/examples/spark/HoodieMorCompactionJob.scala +++ b/hudi-examples/hudi-examples-spark/src/main/scala/org/apache/hudi/examples/spark/HoodieMorCompactionJob.scala @@ -20,7 +20,7 @@ package org.apache.hudi.examples.spark import org.apache.hudi.DataSourceWriteOptions.{PARTITIONPATH_FIELD, PRECOMBINE_FIELD, RECORDKEY_FIELD, TABLE_TYPE} -import org.apache.hudi.QuickstartUtils.getQuickstartWriteConfigs +import org.apache.hudi.examples.quickstart.QuickstartUtils.getQuickstartWriteConfigs import org.apache.hudi.client.SparkRDDWriteClient import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.common.model.{HoodieAvroPayload, HoodieRecordPayload, HoodieTableType} @@ -55,6 +55,7 @@ object HoodieMorCompactionJob { val dataGen = new HoodieExampleDataGenerator[HoodieAvroPayload] val tablePath = args(0) val tableName = args(1) + insertData(spark, tablePath, tableName, dataGen, HoodieTableType.MERGE_ON_READ.name()) updateData(spark, tablePath, tableName, dataGen, HoodieTableType.MERGE_ON_READ.name()) val cfg = HoodieWriteConfig.newBuilder() diff --git a/hudi-examples/hudi-examples-spark/src/test/java/org/apache/hudi/examples/quickstart/TestHoodieSparkQuickstart.java b/hudi-examples/hudi-examples-spark/src/test/java/org/apache/hudi/examples/quickstart/TestHoodieSparkQuickstart.java new file mode 100644 index 0000000000000..426f4317daf05 --- /dev/null +++ b/hudi-examples/hudi-examples-spark/src/test/java/org/apache/hudi/examples/quickstart/TestHoodieSparkQuickstart.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.examples.quickstart; + +import java.io.File; +import java.nio.file.Paths; +import org.apache.hudi.client.HoodieReadClient; +import org.apache.hudi.client.SparkRDDWriteClient; +import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.model.HoodieAvroPayload; +import org.apache.hudi.examples.common.HoodieExampleDataGenerator; +import org.apache.hudi.testutils.providers.SparkProvider; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.util.Utils; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +public class TestHoodieSparkQuickstart implements SparkProvider { + protected static transient HoodieSparkEngineContext context; + + private static transient SparkSession spark; + private static transient SQLContext sqlContext; + private static transient JavaSparkContext jsc; + + /** + * An indicator of the initialization status. + */ + protected boolean initialized = false; + @TempDir + protected java.nio.file.Path tempDir; + + private static final HoodieExampleDataGenerator dataGen = new HoodieExampleDataGenerator<>(); + + @Override + public SparkSession spark() { + return spark; + } + + @Override + public SQLContext sqlContext() { + return sqlContext; + } + + @Override + public JavaSparkContext jsc() { + return jsc; + } + + @Override + public HoodieSparkEngineContext context() { + return context; + } + + public String basePath() { + return tempDir.toAbsolutePath().toString(); + } + + public String tablePath(String tableName) { + return Paths.get(basePath(), tableName).toString(); + } + + @BeforeEach + public synchronized void runBeforeEach() { + initialized = spark != null; + if (!initialized) { + SparkConf sparkConf = conf(); + SparkRDDWriteClient.registerClasses(sparkConf); + HoodieReadClient.addHoodieSupport(sparkConf); + spark = SparkSession.builder().config(sparkConf).getOrCreate(); + sqlContext = spark.sqlContext(); + jsc = new JavaSparkContext(spark.sparkContext()); + context = new HoodieSparkEngineContext(jsc); + } + } + + @Test + public void testHoodieSparkQuickstart() { + String tableName = "spark_quick_start"; + String tablePath = tablePath(tableName); + + try { + HoodieSparkQuickstart.insertData(spark, jsc, tablePath, tableName, dataGen); + HoodieSparkQuickstart.updateData(spark, jsc, tablePath, tableName, dataGen); + + HoodieSparkQuickstart.queryData(spark, jsc, tablePath, tableName, dataGen); + HoodieSparkQuickstart.incrementalQuery(spark, tablePath, tableName); + HoodieSparkQuickstart.pointInTimeQuery(spark, tablePath, tableName); + + HoodieSparkQuickstart.delete(spark, tablePath, tableName); + HoodieSparkQuickstart.deleteByPartition(spark, tablePath, tableName); + } finally { + Utils.deleteRecursively(new File(tablePath)); + } + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestQuickstartUtils.java b/hudi-examples/hudi-examples-spark/src/test/java/org/apache/hudi/examples/quickstart/TestQuickstartUtils.java similarity index 97% rename from hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestQuickstartUtils.java rename to hudi-examples/hudi-examples-spark/src/test/java/org/apache/hudi/examples/quickstart/TestQuickstartUtils.java index 2042249ecd788..8dc010d12cb94 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestQuickstartUtils.java +++ b/hudi-examples/hudi-examples-spark/src/test/java/org/apache/hudi/examples/quickstart/TestQuickstartUtils.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hudi; +package org.apache.hudi.examples.quickstart; import org.apache.hudi.exception.HoodieException; import org.junit.jupiter.api.Assertions; diff --git a/hudi-examples/hudi-examples-spark/src/test/resources/log4j-surefire-quiet.properties b/hudi-examples/hudi-examples-spark/src/test/resources/log4j-surefire-quiet.properties new file mode 100644 index 0000000000000..2b94ea2903067 --- /dev/null +++ b/hudi-examples/hudi-examples-spark/src/test/resources/log4j-surefire-quiet.properties @@ -0,0 +1,30 @@ +### +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +### +log4j.rootLogger=WARN, CONSOLE +log4j.logger.org.apache.hudi=DEBUG +log4j.logger.org.apache.hadoop.hbase=ERROR + +# CONSOLE is set to be a ConsoleAppender. +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender +# CONSOLE uses PatternLayout. +log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout +log4j.appender.CONSOLE.layout.ConversionPattern=[%-5p] %d %c %x - %m%n +log4j.appender.CONSOLE.filter.a=org.apache.log4j.varia.LevelRangeFilter +log4j.appender.CONSOLE.filter.a.AcceptOnMatch=true +log4j.appender.CONSOLE.filter.a.LevelMin=WARN +log4j.appender.CONSOLE.filter.a.LevelMax=FATAL \ No newline at end of file diff --git a/hudi-examples/hudi-examples-spark/src/test/resources/log4j-surefire.properties b/hudi-examples/hudi-examples-spark/src/test/resources/log4j-surefire.properties new file mode 100644 index 0000000000000..8dcd17f303f6b --- /dev/null +++ b/hudi-examples/hudi-examples-spark/src/test/resources/log4j-surefire.properties @@ -0,0 +1,31 @@ +### +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +### +log4j.rootLogger=INFO, CONSOLE +log4j.logger.org.apache=INFO +log4j.logger.org.apache.hudi=DEBUG +log4j.logger.org.apache.hadoop.hbase=ERROR + +# A1 is set to be a ConsoleAppender. +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender +# A1 uses PatternLayout. +log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout +log4j.appender.CONSOLE.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n +log4j.appender.CONSOLE.filter.a=org.apache.log4j.varia.LevelRangeFilter +log4j.appender.CONSOLE.filter.a.AcceptOnMatch=true +log4j.appender.CONSOLE.filter.a.LevelMin=INFO +log4j.appender.CONSOLE.filter.a.LevelMax=FATAL diff --git a/hudi-examples/pom.xml b/hudi-examples/pom.xml index 2ea284f203209..4386738dfcac0 100644 --- a/hudi-examples/pom.xml +++ b/hudi-examples/pom.xml @@ -25,204 +25,13 @@ 4.0.0 hudi-examples - jar + pom - - ${project.parent.basedir} - true - + + hudi-examples-common + hudi-examples-spark + hudi-examples-flink + hudi-examples-java + - - - - src/main/resources - - - - - - org.apache.maven.plugins - maven-dependency-plugin - - - copy-dependencies - prepare-package - - copy-dependencies - - - ${project.build.directory}/lib - true - true - true - - - - - - net.alchim31.maven - scala-maven-plugin - - - scala-compile-first - process-resources - - add-source - compile - - - - - - org.apache.maven.plugins - maven-compiler-plugin - - - compile - - compile - - - - - - org.apache.maven.plugins - maven-jar-plugin - - - - test-jar - - test-compile - - - - false - - - - org.apache.rat - apache-rat-plugin - - - - - - - - org.scala-lang - scala-library - ${scala.version} - - - - org.apache.hudi - hudi-common - ${project.version} - - - - org.apache.hudi - hudi-cli - ${project.version} - - - - org.apache.hudi - hudi-client-common - ${project.version} - - - - org.apache.hudi - hudi-java-client - ${project.version} - - - - org.apache.hudi - hudi-spark-client - ${project.version} - - - - org.apache.hudi - hudi-utilities_${scala.binary.version} - ${project.version} - - - - org.apache.hudi - hudi-spark_${scala.binary.version} - ${project.version} - - - - org.apache.hudi - hudi-hadoop-mr - ${project.version} - - - - org.apache.hudi - hudi-timeline-service - ${project.version} - - - - - org.apache.spark - spark-core_${scala.binary.version} - - - org.apache.spark - spark-sql_${scala.binary.version} - - - org.apache.spark - spark-avro_${scala.binary.version} - - - - - org.apache.parquet - parquet-hadoop - ${parquet.version} - - - - - org.apache.avro - avro - - - - org.apache.parquet - parquet-avro - - - - - ${hive.groupid} - hive-common - - - ${hive.groupid} - hive-exec - ${hive.version} - provided - ${hive.exec.classifier} - - - javax.mail - mail - - - org.eclipse.jetty.aggregate - * - - - - -