diff --git a/docker/demo/config/test-suite/spark-immutable-to-mutable.yaml b/docker/demo/config/test-suite/spark-immutable-to-mutable.yaml new file mode 100644 index 0000000000000..4b974c54d21af --- /dev/null +++ b/docker/demo/config/test-suite/spark-immutable-to-mutable.yaml @@ -0,0 +1,54 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +dag_name: spark-immutable-dataset.yaml +dag_rounds: 1 +dag_intermittent_delay_mins: 0 +dag_content: + first_bulk_insert: + config: + record_size: 200 + num_partitions_insert: 10 + repeat_count: 1 + num_records_insert: 100 + type: SparkBulkInsertNode + deps: none + first_validate: + config: + validate_hive: false + delete_input_data: false + type: ValidateDatasetNode + deps: first_bulk_insert + first_update: + config: + record_size: 200 + num_partitions_upsert: 10 + repeat_count: 1 + num_records_upsert: 50 + type: SparkUpsertNode + deps: first_validate + second_validate: + config: + validate_hive: false + delete_input_data: false + validate_full_data: true + type: ValidateDatasetNode + deps: first_update + last_validate: + config: + execute_itr_count: 1 + delete_input_data: true + type: ValidateAsyncOperations + deps: second_validate diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java index 8adea6b179804..e7e28d3dec28b 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java @@ -340,5 +340,8 @@ public static class HoodieTestSuiteConfig extends HoodieDeltaStreamer.Config { @Parameter(names = {"--trino-jdbc-password"}, description = "Password corresponding to the username to use for authentication") public String trinoPassword; + + @Parameter(names = {"--index-type"}, description = "Index type to use for writes") + public String indexType = "SIMPLE"; } } diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java index d763115281ce6..75d3fd94101f3 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java @@ -91,6 +91,7 @@ public HoodieWriteConfig getWriteConfig() { } private HoodieWriteConfig getHoodieClientConfig(HoodieTestSuiteJob.HoodieTestSuiteConfig cfg, Properties props, String schema) { + HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder().combineInput(true, true).withPath(cfg.targetBasePath) .withAutoCommit(false) @@ -99,7 +100,7 @@ private HoodieWriteConfig getHoodieClientConfig(HoodieTestSuiteJob.HoodieTestSui .withPayloadClass(cfg.payloadClassName) .build()) .forTable(cfg.targetTableName) - .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()) + .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.valueOf(cfg.indexType)).build()) .withProps(props); builder = builder.withSchema(schema); return builder.build(); diff --git a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkInsertNode.scala b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkInsertNode.scala index bea5ae3d6fdfc..38751bda5a2a6 100644 --- a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkInsertNode.scala +++ b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkInsertNode.scala @@ -21,7 +21,7 @@ package org.apache.hudi.integ.testsuite.dag.nodes import org.apache.avro.Schema import org.apache.hudi.client.WriteStatus import org.apache.hudi.common.util.collection.Pair -import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.config.{HoodieIndexConfig, HoodieWriteConfig} import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config import org.apache.hudi.integ.testsuite.dag.ExecutionContext import org.apache.hudi.integ.testsuite.writer.DeltaWriteStats @@ -70,6 +70,7 @@ class SparkInsertNode(dagNodeConfig: Config) extends DagNode[RDD[WriteStatus]] { .option(DataSourceWriteOptions.PRECOMBINE_FIELD.key(), "test_suite_source_ordering_field") .option(DataSourceWriteOptions.TABLE_NAME.key, context.getHoodieTestSuiteWriter.getCfg.targetTableName) .option(DataSourceWriteOptions.TABLE_TYPE.key, context.getHoodieTestSuiteWriter.getCfg.tableType) + .option(HoodieIndexConfig.INDEX_TYPE.key, context.getHoodieTestSuiteWriter.getCfg.indexType) .option(DataSourceWriteOptions.OPERATION.key, getOperation()) .option(HoodieWriteConfig.TBL_NAME.key, context.getHoodieTestSuiteWriter.getCfg.targetTableName) .mode(SaveMode.Append)