diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java index dee91b2828aee..9970687abb4f2 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java @@ -27,6 +27,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.exception.HoodieException; +import org.apache.avro.Schema; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -40,6 +41,7 @@ public class CommitUtils { private static final Logger LOG = LogManager.getLogger(CommitUtils.class); + private static final String NULL_SCHEMA_STR = Schema.create(Schema.Type.NULL).toString(); /** * Gets the commit action type for given write operation and table type. @@ -84,7 +86,8 @@ public static HoodieCommitMetadata buildMetadata(List writeStat if (extraMetadata.isPresent()) { extraMetadata.get().forEach(commitMetadata::addMetadata); } - commitMetadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, schemaToStoreInCommit == null ? "" : schemaToStoreInCommit); + commitMetadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, (schemaToStoreInCommit == null || schemaToStoreInCommit.equals(NULL_SCHEMA_STR)) + ? "" : schemaToStoreInCommit); commitMetadata.setOperationType(operationType); return commitMetadata; } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index 1c80896586515..aa233d4e37d3e 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -70,6 +70,7 @@ import org.apache.hudi.utilities.sources.ParquetDFSSource; import org.apache.hudi.utilities.sources.SqlSource; import org.apache.hudi.utilities.sources.TestDataSource; +import org.apache.hudi.utilities.sources.TestParquetDFSSourceEmptyBatch; import org.apache.hudi.utilities.testutils.JdbcTestUtils; import org.apache.hudi.utilities.testutils.UtilitiesTestBase; import org.apache.hudi.utilities.testutils.sources.DistributedTestDataSource; @@ -130,6 +131,7 @@ import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -1420,15 +1422,34 @@ private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTrans } private void testParquetDFSSource(boolean useSchemaProvider, List transformerClassNames) throws Exception { + testParquetDFSSource(useSchemaProvider, transformerClassNames, false); + } + + private void testParquetDFSSource(boolean useSchemaProvider, List transformerClassNames, boolean testEmptyBatch) throws Exception { prepareParquetDFSSource(useSchemaProvider, transformerClassNames != null); String tableBasePath = dfsBasePath + "/test_parquet_table" + testNum; HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer( - TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, ParquetDFSSource.class.getName(), + TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, testEmptyBatch ? TestParquetDFSSourceEmptyBatch.class.getName() + : ParquetDFSSource.class.getName(), transformerClassNames, PROPS_FILENAME_TEST_PARQUET, false, useSchemaProvider, 100000, false, null, null, "timestamp", null), jsc); deltaStreamer.sync(); TestHelpers.assertRecordCount(PARQUET_NUM_RECORDS, tableBasePath + "/*/*.parquet", sqlContext); testNum++; + + if (testEmptyBatch) { + prepareParquetDFSFiles(100, PARQUET_SOURCE_ROOT, "2.parquet", false, null, null); + // parquet source to return empty batch + TestParquetDFSSourceEmptyBatch.returnEmptyBatch = true; + deltaStreamer.sync(); + // since we mimic'ed empty batch, total records should be same as first sync(). + TestHelpers.assertRecordCount(PARQUET_NUM_RECORDS, tableBasePath + "/*/*.parquet", sqlContext); + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setBasePath(tableBasePath).setConf(jsc.hadoopConfiguration()).build(); + + // validate table schema fetches valid schema from last but one commit. + TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(metaClient); + assertNotEquals(tableSchemaResolver.getTableAvroSchema(), Schema.create(Schema.Type.NULL).toString()); + } } private void testORCDFSSource(boolean useSchemaProvider, List transformerClassNames) throws Exception { @@ -1584,6 +1605,11 @@ public void testParquetDFSSourceWithoutSchemaProviderAndNoTransformer() throws E testParquetDFSSource(false, null); } + @Test + public void testParquetDFSSourceForEmptyBatch() throws Exception { + testParquetDFSSource(false, null, true); + } + @Test public void testParquetDFSSourceWithoutSchemaProviderAndTransformer() throws Exception { testParquetDFSSource(false, Collections.singletonList(TripsWithDistanceTransformer.class.getName())); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestParquetDFSSourceEmptyBatch.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestParquetDFSSourceEmptyBatch.java new file mode 100644 index 0000000000000..3129e91a9d3e0 --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestParquetDFSSourceEmptyBatch.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.utilities.sources; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.utilities.schema.SchemaProvider; + +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; + +public class TestParquetDFSSourceEmptyBatch extends ParquetDFSSource { + + public static boolean returnEmptyBatch; + + public TestParquetDFSSourceEmptyBatch(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, + SchemaProvider schemaProvider) { + super(props, sparkContext, sparkSession, schemaProvider); + } + + @Override + public Pair>, String> fetchNextBatch(Option lastCkptStr, long sourceLimit) { + Pair>, String> toReturn = super.fetchNextBatch(lastCkptStr, sourceLimit); + if (returnEmptyBatch) { + return Pair.of(Option.empty(), toReturn.getRight()); + } + return toReturn; + } +}