diff --git a/docker/demo/config/test-suite/deltastreamer-immutable-dataset.yaml b/docker/demo/config/test-suite/deltastreamer-immutable-dataset.yaml new file mode 100644 index 0000000000000..4903e3650c144 --- /dev/null +++ b/docker/demo/config/test-suite/deltastreamer-immutable-dataset.yaml @@ -0,0 +1,53 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +dag_name: deltastreamer-immutable-dataset.yaml +dag_rounds: 5 +dag_intermittent_delay_mins: 0 +dag_content: + first_bulk_insert: + config: + record_size: 200 + num_partitions_insert: 10 + repeat_count: 3 + num_records_insert: 5000 + type: BulkInsertNode + deps: none + first_validate: + config: + validate_hive: false + delete_input_data: false + type: ValidateDatasetNode + deps: first_bulk_insert + first_insert: + config: + record_size: 200 + num_partitions_insert: 10 + repeat_count: 3 + num_records_insert: 5000 + type: InsertNode + deps: first_validate + second_validate: + config: + validate_hive: false + delete_input_data: false + type: ValidateDatasetNode + deps: first_insert + last_validate: + config: + execute_itr_count: 5 + delete_input_data: true + type: ValidateAsyncOperations + deps: second_validate \ No newline at end of file diff --git a/docker/demo/config/test-suite/deltastreamer-pure-bulk-inserts.yaml b/docker/demo/config/test-suite/deltastreamer-pure-bulk-inserts.yaml new file mode 100644 index 0000000000000..d5342e22b1282 --- /dev/null +++ b/docker/demo/config/test-suite/deltastreamer-pure-bulk-inserts.yaml @@ -0,0 +1,38 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +dag_name: deltastreamer-pure-bulk-inserts.yaml +dag_rounds: 10 +dag_intermittent_delay_mins: 0 +dag_content: + first_bulk_insert: + config: + record_size: 200 + num_partitions_insert: 10 + repeat_count: 3 + num_records_insert: 5000 + type: BulkInsertNode + deps: none + second_validate: + config: + validate_hive: false + delete_input_data: false + type: ValidateDatasetNode + deps: first_bulk_insert + last_validate: + config: + execute_itr_count: 10 + type: ValidateAsyncOperations + deps: second_validate \ No newline at end of file diff --git a/docker/demo/config/test-suite/deltastreamer-pure-inserts.yaml b/docker/demo/config/test-suite/deltastreamer-pure-inserts.yaml new file mode 100644 index 0000000000000..3b209fe5fe016 --- /dev/null +++ b/docker/demo/config/test-suite/deltastreamer-pure-inserts.yaml @@ -0,0 +1,38 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +dag_name: deltastreamer-pure-inserts.yaml +dag_rounds: 10 +dag_intermittent_delay_mins: 0 +dag_content: + first_insert: + config: + record_size: 200 + num_partitions_insert: 10 + repeat_count: 3 + num_records_insert: 5000 + type: InsertNode + deps: none + second_validate: + config: + validate_hive: false + delete_input_data: false + type: ValidateDatasetNode + deps: first_insert + last_validate: + config: + execute_itr_count: 10 + type: ValidateAsyncOperations + deps: second_validate \ No newline at end of file diff --git a/docker/demo/config/test-suite/insert-overwrite.yaml b/docker/demo/config/test-suite/insert-overwrite.yaml index dc185d5938f6d..7e54cea6a910d 100644 --- a/docker/demo/config/test-suite/insert-overwrite.yaml +++ b/docker/demo/config/test-suite/insert-overwrite.yaml @@ -17,7 +17,6 @@ dag_name: simple-deltastreamer.yaml dag_rounds: 1 dag_intermittent_delay_mins: 1 dag_content: - first_insert: config: record_size: 1000 @@ -91,4 +90,4 @@ dag_content: validate_hive: false delete_input_data: false type: ValidateDatasetNode - deps: third_upsert + deps: third_upsert \ No newline at end of file diff --git a/docker/demo/config/test-suite/multi-writer-1-ds.yaml b/docker/demo/config/test-suite/multi-writer-1-ds.yaml index 3fe33b671dc39..3476d8075a6ed 100644 --- a/docker/demo/config/test-suite/multi-writer-1-ds.yaml +++ b/docker/demo/config/test-suite/multi-writer-1-ds.yaml @@ -14,7 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. dag_name: simple-deltastreamer.yaml -dag_rounds: 3 +dag_rounds: 6 dag_intermittent_delay_mins: 0 dag_content: first_insert: diff --git a/docker/demo/config/test-suite/multi-writer-1-sds.yaml b/docker/demo/config/test-suite/multi-writer-1-sds.yaml new file mode 100644 index 0000000000000..d60a8ba6d78a6 --- /dev/null +++ b/docker/demo/config/test-suite/multi-writer-1-sds.yaml @@ -0,0 +1,52 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +dag_name: cow-spark-simple.yaml +dag_rounds: 6 +dag_intermittent_delay_mins: 0 +dag_content: + first_insert: + config: + record_size: 1000 + num_partitions_insert: 1 + repeat_count: 1 + num_records_insert: 100000 + start_partition: 1 + type: SparkInsertNode + deps: none + first_upsert: + config: + record_size: 1000 + num_partitions_insert: 1 + num_records_insert: 50000 + repeat_count: 1 + num_records_upsert: 50000 + num_partitions_upsert: 1 + start_partition: 1 + type: SparkUpsertNode + deps: first_insert + first_delete: + config: + num_partitions_delete: 0 + num_records_delete: 10000 + start_partition: 1 + type: SparkDeleteNode + deps: first_upsert + second_validate: + config: + validate_hive: false + delete_input_data: true + type: ValidateDatasetNode + deps: first_delete \ No newline at end of file diff --git a/docker/demo/config/test-suite/multi-writer-2-sds.yaml b/docker/demo/config/test-suite/multi-writer-2-sds.yaml index 9242dd26051ec..702065c672112 100644 --- a/docker/demo/config/test-suite/multi-writer-2-sds.yaml +++ b/docker/demo/config/test-suite/multi-writer-2-sds.yaml @@ -14,8 +14,8 @@ # See the License for the specific language governing permissions and # limitations under the License. dag_name: cow-spark-simple.yaml -dag_rounds: 3 -dag_intermittent_delay_mins: 0 +dag_rounds: 5 +dag_intermittent_delay_mins: 1 dag_content: first_insert: config: diff --git a/docker/demo/config/test-suite/multi-writer-3-sds.yaml b/docker/demo/config/test-suite/multi-writer-3-sds.yaml new file mode 100644 index 0000000000000..9ad21f467d50b --- /dev/null +++ b/docker/demo/config/test-suite/multi-writer-3-sds.yaml @@ -0,0 +1,52 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +dag_name: cow-spark-simple.yaml +dag_rounds: 4 +dag_intermittent_delay_mins: 1 +dag_content: + first_insert: + config: + record_size: 1000 + num_partitions_insert: 1 + repeat_count: 1 + num_records_insert: 100000 + start_partition: 20 + type: SparkInsertNode + deps: none + first_upsert: + config: + record_size: 1000 + num_partitions_insert: 1 + num_records_insert: 50000 + repeat_count: 1 + num_records_upsert: 50000 + num_partitions_upsert: 1 + start_partition: 20 + type: SparkUpsertNode + deps: first_insert + first_delete: + config: + num_partitions_delete: 0 + num_records_delete: 10000 + start_partition: 20 + type: SparkDeleteNode + deps: first_upsert + second_validate: + config: + validate_hive: false + delete_input_data: true + type: ValidateDatasetNode + deps: first_delete \ No newline at end of file diff --git a/docker/demo/config/test-suite/multi-writer-4-sds.yaml b/docker/demo/config/test-suite/multi-writer-4-sds.yaml new file mode 100644 index 0000000000000..74dfa1cb4ba6a --- /dev/null +++ b/docker/demo/config/test-suite/multi-writer-4-sds.yaml @@ -0,0 +1,52 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +dag_name: cow-spark-simple.yaml +dag_rounds: 4 +dag_intermittent_delay_mins: 1 +dag_content: + first_insert: + config: + record_size: 1000 + num_partitions_insert: 1 + repeat_count: 1 + num_records_insert: 100000 + start_partition: 30 + type: SparkInsertNode + deps: none + first_upsert: + config: + record_size: 1000 + num_partitions_insert: 1 + num_records_insert: 50000 + repeat_count: 1 + num_records_upsert: 50000 + num_partitions_upsert: 1 + start_partition: 30 + type: SparkUpsertNode + deps: first_insert + first_delete: + config: + num_partitions_delete: 0 + num_records_delete: 10000 + start_partition: 30 + type: SparkDeleteNode + deps: first_upsert + second_validate: + config: + validate_hive: false + delete_input_data: true + type: ValidateDatasetNode + deps: first_delete \ No newline at end of file diff --git a/docker/demo/config/test-suite/multi-writer-local-3.properties b/docker/demo/config/test-suite/multi-writer-local-3.properties new file mode 100644 index 0000000000000..48f0f0b1ace8b --- /dev/null +++ b/docker/demo/config/test-suite/multi-writer-local-3.properties @@ -0,0 +1,57 @@ + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +hoodie.insert.shuffle.parallelism=2 +hoodie.upsert.shuffle.parallelism=2 +hoodie.bulkinsert.shuffle.parallelism=2 +hoodie.delete.shuffle.parallelism=2 + +hoodie.metadata.enable=false + +hoodie.deltastreamer.source.test.num_partitions=100 +hoodie.deltastreamer.source.test.datagen.use_rocksdb_for_storing_existing_keys=false +hoodie.deltastreamer.source.test.max_unique_records=100000000 +hoodie.embed.timeline.server=false +hoodie.deltastreamer.source.input.selector=org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector + +hoodie.deltastreamer.source.input.selector=org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector +hoodie.datasource.hive_sync.skip_ro_suffix=true + +hoodie.datasource.write.recordkey.field=_row_key +hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator +hoodie.datasource.write.partitionpath.field=timestamp + +hoodie.write.concurrency.mode=optimistic_concurrency_control +hoodie.cleaner.policy.failed.writes=LAZY +hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.InProcessLockProvider + +hoodie.deltastreamer.source.dfs.root=/tmp/hudi/input3 +hoodie.deltastreamer.schemaprovider.target.schema.file=file:/tmp/source.avsc +hoodie.deltastreamer.schemaprovider.source.schema.file=file:/tmp/source.avsc +hoodie.deltastreamer.keygen.timebased.timestamp.type=UNIX_TIMESTAMP +hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd + +hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://hiveserver:10000/ +hoodie.datasource.hive_sync.database=testdb +hoodie.datasource.hive_sync.table=table1 +hoodie.datasource.hive_sync.assume_date_partitioning=false +hoodie.datasource.hive_sync.partition_fields=_hoodie_partition_path +hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor + diff --git a/docker/demo/config/test-suite/multi-writer-local-4.properties b/docker/demo/config/test-suite/multi-writer-local-4.properties new file mode 100644 index 0000000000000..4b5120928ccb1 --- /dev/null +++ b/docker/demo/config/test-suite/multi-writer-local-4.properties @@ -0,0 +1,57 @@ + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +hoodie.insert.shuffle.parallelism=2 +hoodie.upsert.shuffle.parallelism=2 +hoodie.bulkinsert.shuffle.parallelism=2 +hoodie.delete.shuffle.parallelism=2 + +hoodie.metadata.enable=false + +hoodie.deltastreamer.source.test.num_partitions=100 +hoodie.deltastreamer.source.test.datagen.use_rocksdb_for_storing_existing_keys=false +hoodie.deltastreamer.source.test.max_unique_records=100000000 +hoodie.embed.timeline.server=false +hoodie.deltastreamer.source.input.selector=org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector + +hoodie.deltastreamer.source.input.selector=org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector +hoodie.datasource.hive_sync.skip_ro_suffix=true + +hoodie.datasource.write.recordkey.field=_row_key +hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator +hoodie.datasource.write.partitionpath.field=timestamp + +hoodie.write.concurrency.mode=optimistic_concurrency_control +hoodie.cleaner.policy.failed.writes=LAZY +hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.InProcessLockProvider + +hoodie.deltastreamer.source.dfs.root=/tmp/hudi/input4 +hoodie.deltastreamer.schemaprovider.target.schema.file=file:/tmp/source.avsc +hoodie.deltastreamer.schemaprovider.source.schema.file=file:/tmp/source.avsc +hoodie.deltastreamer.keygen.timebased.timestamp.type=UNIX_TIMESTAMP +hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd + +hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://hiveserver:10000/ +hoodie.datasource.hive_sync.database=testdb +hoodie.datasource.hive_sync.table=table1 +hoodie.datasource.hive_sync.assume_date_partitioning=false +hoodie.datasource.hive_sync.partition_fields=_hoodie_partition_path +hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor + diff --git a/docker/demo/config/test-suite/spark-delete-partition.yaml b/docker/demo/config/test-suite/spark-delete-partition.yaml new file mode 100644 index 0000000000000..1d23fa7b0851c --- /dev/null +++ b/docker/demo/config/test-suite/spark-delete-partition.yaml @@ -0,0 +1,57 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +dag_name: spark-delete-partition.yaml +dag_rounds: 1 +dag_intermittent_delay_mins: 1 +dag_content: + first_insert: + config: + record_size: 1000 + num_partitions_insert: 5 + repeat_count: 1 + num_records_insert: 10 + type: SparkInsertNode + deps: none + first_delete_partition: + config: + partitions_to_delete: "1970/01/01" + type: SparkDeletePartitionNode + deps: first_insert + second_validate: + config: + validate_full_data : true + input_partitions_to_skip_validate : "0" + validate_hive: false + delete_input_data: false + type: ValidateDatasetNode + deps: first_delete_partition + second_insert: + config: + record_size: 1000 + num_partitions_insert: 5 + repeat_count: 1 + num_records_insert: 10 + start_partition: 2 + type: SparkInsertNode + deps: second_validate + third_validate: + config: + validate_full_data : true + input_partitions_to_skip_validate : "0" + validate_hive: false + delete_input_data: false + type: ValidateDatasetNode + deps: second_insert \ No newline at end of file diff --git a/docker/demo/config/test-suite/spark-immutable-dataset.yaml b/docker/demo/config/test-suite/spark-immutable-dataset.yaml new file mode 100644 index 0000000000000..d6cbf1b244de5 --- /dev/null +++ b/docker/demo/config/test-suite/spark-immutable-dataset.yaml @@ -0,0 +1,53 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +dag_name: spark-immutable-dataset.yaml +dag_rounds: 5 +dag_intermittent_delay_mins: 0 +dag_content: + first_bulk_insert: + config: + record_size: 200 + num_partitions_insert: 10 + repeat_count: 5 + num_records_insert: 5000 + type: SparkBulkInsertNode + deps: none + first_validate: + config: + validate_hive: false + delete_input_data: false + type: ValidateDatasetNode + deps: first_bulk_insert + first_insert: + config: + record_size: 200 + num_partitions_insert: 10 + repeat_count: 5 + num_records_insert: 5000 + type: SparkInsertNode + deps: first_validate + second_validate: + config: + validate_hive: false + delete_input_data: false + type: ValidateDatasetNode + deps: first_insert + last_validate: + config: + execute_itr_count: 5 + delete_input_data: true + type: ValidateAsyncOperations + deps: second_validate \ No newline at end of file diff --git a/docker/demo/config/test-suite/spark-non-core-operations.yaml b/docker/demo/config/test-suite/spark-non-core-operations.yaml new file mode 100644 index 0000000000000..f7189ce4587c8 --- /dev/null +++ b/docker/demo/config/test-suite/spark-non-core-operations.yaml @@ -0,0 +1,204 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +dag_name: spark-non-core-operations.yaml +dag_rounds: 1 +dag_intermittent_delay_mins: 1 +dag_content: + first_insert: + config: + record_size: 1000 + num_partitions_insert: 10 + repeat_count: 1 + num_records_insert: 10000 + type: SparkInsertNode + deps: none + first_upsert: + config: + record_size: 1000 + num_partitions_insert: 10 + num_records_insert: 1000 + repeat_count: 1 + num_records_upsert: 8000 + num_partitions_upsert: 10 + type: SparkUpsertNode + deps: first_insert + second_insert: + config: + record_size: 1000 + num_partitions_insert: 10 + repeat_count: 1 + num_records_insert: 10000 + type: SparkInsertNode + deps: first_upsert + second_upsert: + config: + record_size: 1000 + num_partitions_insert: 10 + num_records_insert: 1000 + repeat_count: 1 + num_records_upsert: 8000 + num_partitions_upsert: 10 + type: SparkUpsertNode + deps: second_insert + first_insert_overwrite: + config: + record_size: 1000 + num_partitions_insert: 10 + repeat_count: 1 + num_records_insert: 10 + type: SparkInsertOverwriteNode + deps: second_upsert + delete_all_input_except_last: + config: + delete_input_data_except_latest: true + type: DeleteInputDatasetNode + deps: first_insert_overwrite + third_insert: + config: + record_size: 1000 + num_partitions_insert: 10 + repeat_count: 1 + num_records_insert: 10000 + type: SparkInsertNode + deps: delete_all_input_except_last + third_upsert: + config: + record_size: 1000 + num_partitions_insert: 10 + num_records_insert: 1000 + repeat_count: 1 + num_records_upsert: 8000 + num_partitions_upsert: 10 + type: SparkUpsertNode + deps: third_insert + second_validate: + config: + validate_full_data : true + validate_hive: false + delete_input_data: false + type: ValidateDatasetNode + deps: third_upsert + fourth_insert: + config: + record_size: 1000 + num_partitions_insert: 10 + repeat_count: 1 + num_records_insert: 10000 + type: SparkInsertNode + deps: second_validate + fourth_upsert: + config: + record_size: 1000 + num_partitions_insert: 10 + num_records_insert: 1000 + repeat_count: 1 + num_records_upsert: 8000 + num_partitions_upsert: 10 + type: SparkUpsertNode + deps: fourth_insert + fifth_insert: + config: + record_size: 1000 + num_partitions_insert: 10 + repeat_count: 1 + num_records_insert: 10000 + type: SparkInsertNode + deps: fourth_upsert + fifth_upsert: + config: + record_size: 1000 + num_partitions_insert: 10 + num_records_insert: 1000 + repeat_count: 1 + num_records_upsert: 8000 + num_partitions_upsert: 10 + type: SparkUpsertNode + deps: fifth_insert + first_insert_overwrite_table: + config: + record_size: 1000 + repeat_count: 1 + num_records_insert: 10 + type: SparkInsertOverwriteTableNode + deps: fifth_upsert + second_delete_all_input_except_last: + config: + delete_input_data_except_latest: true + type: DeleteInputDatasetNode + deps: first_insert_overwrite_table + sixth_insert: + config: + record_size: 1000 + num_partitions_insert: 10 + repeat_count: 1 + num_records_insert: 10000 + type: SparkInsertNode + deps: second_delete_all_input_except_last + sixth_upsert: + config: + record_size: 1000 + num_partitions_insert: 10 + num_records_insert: 1000 + repeat_count: 1 + num_records_upsert: 8000 + num_partitions_upsert: 10 + type: SparkUpsertNode + deps: sixth_insert + third_validate: + config: + validate_full_data : true + validate_hive: false + delete_input_data: false + type: ValidateDatasetNode + deps: sixth_upsert + seventh_insert: + config: + record_size: 1000 + num_partitions_insert: 5 + repeat_count: 1 + num_records_insert: 10 + type: SparkInsertNode + deps: third_validate + first_delete_partition: + config: + partitions_to_delete: "1970/01/01" + type: SparkDeletePartitionNode + deps: seventh_insert + fourth_validate: + config: + validate_full_data : true + input_partitions_to_skip_validate : "0" + validate_hive: false + delete_input_data: false + type: ValidateDatasetNode + deps: first_delete_partition + eigth_insert: + config: + record_size: 1000 + num_partitions_insert: 5 + repeat_count: 1 + num_records_insert: 10 + start_partition: 2 + type: SparkInsertNode + deps: fourth_validate + fifth_validate: + config: + validate_full_data : true + input_partitions_to_skip_validate : "0" + validate_hive: false + delete_input_data: false + type: ValidateDatasetNode + deps: eigth_insert \ No newline at end of file diff --git a/docker/demo/config/test-suite/spark-pure-bulk-inserts.yaml b/docker/demo/config/test-suite/spark-pure-bulk-inserts.yaml new file mode 100644 index 0000000000000..f82705cea3cec --- /dev/null +++ b/docker/demo/config/test-suite/spark-pure-bulk-inserts.yaml @@ -0,0 +1,38 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +dag_name: spark-pure-bulk-inserts.yaml +dag_rounds: 5 +dag_intermittent_delay_mins: 0 +dag_content: + first_bulk_insert: + config: + record_size: 200 + num_partitions_insert: 10 + repeat_count: 4 + num_records_insert: 5000 + type: SparkBulkInsertNode + deps: none + second_validate: + config: + validate_hive: false + delete_input_data: false + type: ValidateDatasetNode + deps: first_bulk_insert + last_validate: + config: + execute_itr_count: 5 + type: ValidateAsyncOperations + deps: second_validate \ No newline at end of file diff --git a/docker/demo/config/test-suite/spark-pure-inserts.yaml b/docker/demo/config/test-suite/spark-pure-inserts.yaml new file mode 100644 index 0000000000000..13482f988c70c --- /dev/null +++ b/docker/demo/config/test-suite/spark-pure-inserts.yaml @@ -0,0 +1,38 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +dag_name: spark-pure-inserts.yaml +dag_rounds: 5 +dag_intermittent_delay_mins: 1 +dag_content: + first_insert: + config: + record_size: 200 + num_partitions_insert: 10 + repeat_count: 3 + num_records_insert: 5000 + type: SparkInsertNode + deps: none + second_validate: + config: + validate_hive: false + delete_input_data: false + type: ValidateDatasetNode + deps: first_insert + last_validate: + config: + execute_itr_count: 10 + type: ValidateAsyncOperations + deps: second_validate \ No newline at end of file diff --git a/hudi-integ-test/README.md b/hudi-integ-test/README.md index 6c1bad138cc18..5d26d03a20a89 100644 --- a/hudi-integ-test/README.md +++ b/hudi-integ-test/README.md @@ -522,6 +522,78 @@ Spark submit with the flag: --saferSchemaEvolution ``` +### Multi-writer tests +Integ test framework also supports multi-writer tests. + +#### Multi-writer tests with deltastreamer and a spark data source writer. + +Sample spark-submit command to test one delta streamer and a spark data source writer. +```shell +./bin/spark-submit --packages org.apache.spark:spark-avro_2.11:2.4.0 \ +--conf spark.task.cpus=3 --conf spark.executor.cores=3 \ +--conf spark.task.maxFailures=100 --conf spark.memory.fraction=0.4 \ +--conf spark.rdd.compress=true --conf spark.kryoserializer.buffer.max=2000m \ +--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \ +--conf spark.memory.storageFraction=0.1 --conf spark.shuffle.service.enabled=true \ +--conf spark.sql.hive.convertMetastoreParquet=false --conf spark.driver.maxResultSize=12g \ +--conf spark.executor.heartbeatInterval=120s --conf spark.network.timeout=600s \ +--conf spark.yarn.max.executor.failures=10 \ +--conf spark.sql.catalogImplementation=hive \ +--class org.apache.hudi.integ.testsuite.HoodieMultiWriterTestSuiteJob \ +/packaging/hudi-integ-test-bundle/target/hudi-integ-test-bundle-0.12.0-SNAPSHOT.jar \ +--source-ordering-field test_suite_source_ordering_field \ +--use-deltastreamer \ +--target-base-path /tmp/hudi/output \ +--input-base-paths "/tmp/hudi/input1,/tmp/hudi/input2" \ +--target-table table1 \ +--props-paths "file:/docker/demo/config/test-suite/multi-writer-local-1.properties,file:/hudi/docker/demo/config/test-suite/multi-writer-local-2.properties" \ +--schemaprovider-class org.apache.hudi.integ.testsuite.schema.TestSuiteFileBasedSchemaProvider \ +--source-class org.apache.hudi.utilities.sources.AvroDFSSource \ +--input-file-size 125829120 \ +--workload-yaml-paths "file:/docker/demo/config/test-suite/multi-writer-1-ds.yaml,file:/docker/demo/config/test-suite/multi-writer-2-sds.yaml" \ +--workload-generator-classname org.apache.hudi.integ.testsuite.dag.WorkflowDagGenerator \ +--table-type COPY_ON_WRITE \ +--compact-scheduling-minshare 1 \ +--input-base-path "dummyValue" \ +--workload-yaml-path "dummyValue" \ +--props "dummyValue" \ +--use-hudi-data-to-generate-updates +``` + +#### Multi-writer tests with 4 concurrent spark data source writer. + +```shell +./bin/spark-submit --packages org.apache.spark:spark-avro_2.11:2.4.0 \ +--conf spark.task.cpus=3 --conf spark.executor.cores=3 \ +--conf spark.task.maxFailures=100 --conf spark.memory.fraction=0.4 \ +--conf spark.rdd.compress=true --conf spark.kryoserializer.buffer.max=2000m \ +--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \ +--conf spark.memory.storageFraction=0.1 --conf spark.shuffle.service.enabled=true \ +--conf spark.sql.hive.convertMetastoreParquet=false --conf spark.driver.maxResultSize=12g \ +--conf spark.executor.heartbeatInterval=120s --conf spark.network.timeout=600s \ +--conf spark.yarn.max.executor.failures=10 --conf spark.sql.catalogImplementation=hive \ +--class org.apache.hudi.integ.testsuite.HoodieMultiWriterTestSuiteJob \ +/hudi-integ-test-bundle-0.12.0-SNAPSHOT.jar \ +--source-ordering-field test_suite_source_ordering_field \ +--use-deltastreamer \ +--target-base-path /tmp/hudi/output \ +--input-base-paths "/tmp/hudi/input1,/tmp/hudi/input2,/tmp/hudi/input3,/tmp/hudi/input4" \ +--target-table table1 \ +--props-paths "file:/multi-writer-local-1.properties,file:/multi-writer-local-2.properties,file:/multi-writer-local-3.properties,file:/multi-writer-local-4.properties" +--schemaprovider-class org.apache.hudi.integ.testsuite.schema.TestSuiteFileBasedSchemaProvider \ +--source-class org.apache.hudi.utilities.sources.AvroDFSSource \ +--input-file-size 125829120 \ +--workload-yaml-paths "file:/multi-writer-1-sds.yaml,file:/multi-writer-2-sds.yaml,file:/multi-writer-3-sds.yaml,file:/multi-writer-4-sds.yaml" \ +--workload-generator-classname org.apache.hudi.integ.testsuite.dag.WorkflowDagGenerator \ +--table-type COPY_ON_WRITE \ +--compact-scheduling-minshare 1 \ +--input-base-path "dummyValue" \ +--workload-yaml-path "dummyValue" \ +--props "dummyValue" \ +--use-hudi-data-to-generate-updates +``` + + ## Automated tests for N no of yamls in Local Docker environment Hudi provides a script to assist you in testing N no of yamls automatically. Checkout the script under diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieMultiWriterTestSuiteJob.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieMultiWriterTestSuiteJob.java index 6cff499825566..87d2f587597a0 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieMultiWriterTestSuiteJob.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieMultiWriterTestSuiteJob.java @@ -30,6 +30,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Random; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -116,6 +117,7 @@ public static void main(String[] args) throws Exception { } ExecutorService executor = Executors.newFixedThreadPool(inputPaths.length); + Random random = new Random(); List testSuiteConfigList = new ArrayList<>(); int jobIndex = 0; @@ -131,11 +133,20 @@ public static void main(String[] args) throws Exception { AtomicBoolean jobFailed = new AtomicBoolean(false); AtomicInteger counter = new AtomicInteger(0); + List waitTimes = new ArrayList<>(); + for (int i = 0;i < jobIndex ;i++) { + if (i == 0) { + waitTimes.add(0L); + } else { + // every job after 1st, will start after 1 min + some delta. + waitTimes.add(60000L + random.nextInt(10000)); + } + } List> completableFutureList = new ArrayList<>(); testSuiteConfigList.forEach(hoodieTestSuiteConfig -> { try { // start each job at 20 seconds interval so that metaClient instantiation does not overstep - Thread.sleep(counter.get() * 20000); + Thread.sleep(waitTimes.get(counter.get())); LOG.info("Starting job " + hoodieTestSuiteConfig.toString()); } catch (InterruptedException e) { e.printStackTrace(); diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java index d7280402d2d5d..f80b91dacf589 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java @@ -99,6 +99,8 @@ public static class Config { private static String ENABLE_METADATA_VALIDATE = "enable_metadata_validate"; private static String VALIDATE_FULL_DATA = "validate_full_data"; private static String DELETE_INPUT_DATA_EXCEPT_LATEST = "delete_input_data_except_latest"; + private static String PARTITIONS_TO_DELETE = "partitions_to_delete"; + private static String INPUT_PARTITIONS_TO_SKIP_VALIDATE = "input_partitions_to_skip_validate"; // Spark SQL Create Table private static String TABLE_TYPE = "table_type"; @@ -201,6 +203,10 @@ public boolean isDisableIngest() { return Boolean.valueOf(configsMap.getOrDefault(DISABLE_INGEST, false).toString()); } + public String getPartitionsToDelete() { + return configsMap.getOrDefault(PARTITIONS_TO_DELETE, "").toString(); + } + public boolean getReinitContext() { return Boolean.valueOf(configsMap.getOrDefault(REINIT_CONTEXT, false).toString()); } @@ -221,6 +227,10 @@ public int validateOnceEveryIteration() { return Integer.valueOf(configsMap.getOrDefault(VALIDATE_ONCE_EVERY_ITR, 1).toString()); } + public String inputPartitonsToSkipWithValidate() { + return configsMap.getOrDefault(INPUT_PARTITIONS_TO_SKIP_VALIDATE, "").toString(); + } + public boolean isValidateFullData() { return Boolean.valueOf(configsMap.getOrDefault(VALIDATE_FULL_DATA, false).toString()); } diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/BaseValidateDatasetNode.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/BaseValidateDatasetNode.java index b5c661cb085f6..a0ebdc5754716 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/BaseValidateDatasetNode.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/BaseValidateDatasetNode.java @@ -163,8 +163,13 @@ private Dataset getInputDf(ExecutionContext context, SparkSession session, // todo: fix hard coded fields from configs. // read input and resolve insert, updates, etc. Dataset inputDf = session.read().format("avro").load(inputPath); + Dataset trimmedDf = inputDf; + if (!config.inputPartitonsToSkipWithValidate().isEmpty()) { + trimmedDf = inputDf.filter("instr("+partitionPathField+", \'"+ config.inputPartitonsToSkipWithValidate() +"\') != 1"); + } + ExpressionEncoder encoder = getEncoder(inputDf.schema()); - return inputDf.groupByKey( + return trimmedDf.groupByKey( (MapFunction) value -> (partitionPathField.isEmpty() ? value.getAs(recordKeyField) : (value.getAs(partitionPathField) + "+" + value.getAs(recordKeyField))), Encoders.STRING()) .reduceGroups((ReduceFunction) (v1, v2) -> { diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java index e7bc7b00a82a4..c30be2a2a5d2c 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java @@ -123,7 +123,7 @@ public JavaRDD generateInserts(Config operation) { int startPartition = operation.getStartPartition(); // Each spark partition below will generate records for a single partition given by the integer index. - List partitionIndexes = IntStream.rangeClosed(0 + startPartition, numPartitions + startPartition) + List partitionIndexes = IntStream.rangeClosed(0 + startPartition, numPartitions + startPartition - 1) .boxed().collect(Collectors.toList()); JavaRDD inputBatch = jsc.parallelize(partitionIndexes, numPartitions) diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordFullPayloadGenerator.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordFullPayloadGenerator.java index 59f02de0ac1a6..a936a81665116 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordFullPayloadGenerator.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordFullPayloadGenerator.java @@ -338,7 +338,7 @@ public boolean validate(GenericRecord record) { */ @VisibleForTesting public GenericRecord updateTimestamp(GenericRecord record, String fieldName) { - long delta = TimeUnit.SECONDS.convert((++partitionIndex % numDatePartitions) + startPartition, TimeUnit.DAYS); + long delta = TimeUnit.SECONDS.convert((partitionIndex++ % numDatePartitions) + startPartition, TimeUnit.DAYS); record.put(fieldName, delta); return record; } diff --git a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkBulkInsertNode.scala b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkBulkInsertNode.scala index ac254bea8dad0..b426f87071127 100644 --- a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkBulkInsertNode.scala +++ b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkBulkInsertNode.scala @@ -19,49 +19,18 @@ package org.apache.hudi.integ.testsuite.dag.nodes import org.apache.hudi.client.WriteStatus -import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config -import org.apache.hudi.integ.testsuite.dag.ExecutionContext -import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions} +import org.apache.hudi.DataSourceWriteOptions import org.apache.spark.rdd.RDD -import org.apache.spark.sql.SaveMode - -import scala.collection.JavaConverters._ /** * Spark datasource based bulk insert node * * @param dagNodeConfig DAG node configurations. */ -class SparkBulkInsertNode(dagNodeConfig: Config) extends DagNode[RDD[WriteStatus]] { - - config = dagNodeConfig +class SparkBulkInsertNode(dagNodeConfig: Config) extends SparkInsertNode(dagNodeConfig) { - /** - * Execute the {@link DagNode}. - * - * @param context The context needed for an execution of a node. - * @param curItrCount iteration count for executing the node. - * @throws Exception Thrown if the execution failed. - */ - override def execute(context: ExecutionContext, curItrCount: Int): Unit = { - if (!config.isDisableGenerate) { - context.getDeltaGenerator().writeRecords(context.getDeltaGenerator().generateInserts(config)).getValue().count() - } - val inputDF = AvroConversionUtils.createDataFrame(context.getWriterContext.getHoodieTestSuiteWriter.getNextBatch, - context.getWriterContext.getHoodieTestSuiteWriter.getSchema, - context.getWriterContext.getSparkSession) - val saveMode = if(curItrCount == 0) SaveMode.Overwrite else SaveMode.Append - inputDF.write.format("hudi") - .options(DataSourceWriteOptions.translateSqlOptions(context.getWriterContext.getProps.asScala.toMap)) - .option(DataSourceWriteOptions.TABLE_NAME.key(), context.getHoodieTestSuiteWriter.getCfg.targetTableName) - .option(DataSourceWriteOptions.TABLE_TYPE.key(), context.getHoodieTestSuiteWriter.getCfg.tableType) - .option(DataSourceWriteOptions.OPERATION.key(), DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL) - .option(DataSourceWriteOptions.ENABLE_ROW_WRITER.key(), String.valueOf(config.enableRowWriting())) - .option(DataSourceWriteOptions.COMMIT_METADATA_KEYPREFIX.key(), "deltastreamer.checkpoint.key") - .option("deltastreamer.checkpoint.key", context.getWriterContext.getHoodieTestSuiteWriter.getLastCheckpoint.orElse("")) - .option(HoodieWriteConfig.TBL_NAME.key(), context.getHoodieTestSuiteWriter.getCfg.targetTableName) - .mode(saveMode) - .save(context.getHoodieTestSuiteWriter.getWriteConfig.getBasePath) + override def getOperation(): String = { + DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL } } diff --git a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkDeletePartitionNode.scala b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkDeletePartitionNode.scala new file mode 100644 index 0000000000000..9354deea28bb0 --- /dev/null +++ b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkDeletePartitionNode.scala @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.integ.testsuite.dag.nodes + + +import org.apache.avro.Schema +import org.apache.hudi.client.WriteStatus +import org.apache.hudi.common.util.collection.Pair +import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config +import org.apache.hudi.integ.testsuite.dag.ExecutionContext +import org.apache.hudi.integ.testsuite.schema.SchemaUtils +import org.apache.hudi.integ.testsuite.writer.DeltaWriteStats +import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions, HoodieSparkUtils} +import org.apache.log4j.LogManager +import org.apache.spark.api.java.JavaRDD +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.SaveMode + +import scala.collection.JavaConverters._ + +/** + * Spark datasource based insert node + * + * @param dagNodeConfig DAG node configurations. + */ +class SparkDeletePartitionNode(dagNodeConfig: Config) extends DagNode[RDD[WriteStatus]] { + + private val log = LogManager.getLogger(getClass) + config = dagNodeConfig + + /** + * Execute the {@link DagNode}. + * + * @param context The context needed for an execution of a node. + * @param curItrCount iteration count for executing the node. + * @throws Exception Thrown if the execution failed. + */ + override def execute(context: ExecutionContext, curItrCount: Int): Unit = { + println("Generating input data for node {}", this.getName) + + context.getWriterContext.getSparkSession.emptyDataFrame.write.format("hudi") + .options(DataSourceWriteOptions.translateSqlOptions(context.getWriterContext.getProps.asScala.toMap)) + .option(DataSourceWriteOptions.PRECOMBINE_FIELD.key(), SchemaUtils.SOURCE_ORDERING_FIELD) + .option(DataSourceWriteOptions.TABLE_NAME.key, context.getHoodieTestSuiteWriter.getCfg.targetTableName) + .option(DataSourceWriteOptions.TABLE_TYPE.key, context.getHoodieTestSuiteWriter.getCfg.tableType) + .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.DELETE_PARTITION_OPERATION_OPT_VAL) + .option(HoodieWriteConfig.TBL_NAME.key, context.getHoodieTestSuiteWriter.getCfg.targetTableName) + .option(DataSourceWriteOptions.PARTITIONS_TO_DELETE.key, config.getPartitionsToDelete) + .mode(SaveMode.Append) + .save(context.getHoodieTestSuiteWriter.getWriteConfig.getBasePath) + } +} diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 4423874ab8e8c..a7d8df35f3739 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -212,7 +212,6 @@ object HoodieSparkSqlWriter { (writeStatuses, client) } case WriteOperationType.DELETE_PARTITION => { - val genericRecords = registerKryoClassesAndGetGenericRecords(tblName, sparkContext, df, reconcileSchema) if (!tableExists) { throw new HoodieException(s"hoodie table at $basePath does not exist") } @@ -222,6 +221,7 @@ object HoodieSparkSqlWriter { val partitionColsToDelete = parameters(DataSourceWriteOptions.PARTITIONS_TO_DELETE.key()).split(",") java.util.Arrays.asList(partitionColsToDelete: _*) } else { + val genericRecords = registerKryoClassesAndGetGenericRecords(tblName, sparkContext, df, reconcileSchema) genericRecords.map(gr => keyGenerator.getKey(gr).getPartitionPath).toJavaRDD().distinct().collect() } // Create a HoodieWriteClient & issue the delete. diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala index 111a46261c769..339dbb5c715ef 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala @@ -20,7 +20,6 @@ package org.apache.hudi import java.io.IOException import java.time.Instant import java.util.{Collections, Date, UUID} - import org.apache.commons.io.FileUtils import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.client.SparkRDDWriteClient