Skip to content

Commit 8053f01

Browse files
committed
Fixing NULL schema provider for empty batch
1 parent 4e09545 commit 8053f01

3 files changed

Lines changed: 87 additions & 2 deletions

File tree

hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.hudi.common.table.timeline.HoodieTimeline;
2828
import org.apache.hudi.exception.HoodieException;
2929

30+
import org.apache.avro.Schema;
3031
import org.apache.log4j.LogManager;
3132
import org.apache.log4j.Logger;
3233

@@ -40,6 +41,7 @@
4041
public class CommitUtils {
4142

4243
private static final Logger LOG = LogManager.getLogger(CommitUtils.class);
44+
private static final String NULL_SCHEMA_STR = Schema.create(Schema.Type.NULL).toString();
4345

4446
/**
4547
* Gets the commit action type for given write operation and table type.
@@ -84,7 +86,10 @@ public static HoodieCommitMetadata buildMetadata(List<HoodieWriteStat> writeStat
8486
if (extraMetadata.isPresent()) {
8587
extraMetadata.get().forEach(commitMetadata::addMetadata);
8688
}
87-
commitMetadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, schemaToStoreInCommit == null ? "" : schemaToStoreInCommit);
89+
// NULL Schema should not be written to commit metadata
90+
if (!(schemaToStoreInCommit != null && schemaToStoreInCommit.equals(NULL_SCHEMA_STR))) {
91+
commitMetadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, schemaToStoreInCommit == null ? "" : schemaToStoreInCommit);
92+
}
8893
commitMetadata.setOperationType(operationType);
8994
return commitMetadata;
9095
}

hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@
7070
import org.apache.hudi.utilities.sources.ParquetDFSSource;
7171
import org.apache.hudi.utilities.sources.SqlSource;
7272
import org.apache.hudi.utilities.sources.TestDataSource;
73+
import org.apache.hudi.utilities.sources.TestParquetDFSSourceEmptyBatch;
7374
import org.apache.hudi.utilities.testutils.JdbcTestUtils;
7475
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
7576
import org.apache.hudi.utilities.testutils.sources.DistributedTestDataSource;
@@ -130,6 +131,7 @@
130131
import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME;
131132
import static org.junit.jupiter.api.Assertions.assertEquals;
132133
import static org.junit.jupiter.api.Assertions.assertFalse;
134+
import static org.junit.jupiter.api.Assertions.assertNotEquals;
133135
import static org.junit.jupiter.api.Assertions.assertNotNull;
134136
import static org.junit.jupiter.api.Assertions.assertThrows;
135137
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -1420,15 +1422,39 @@ private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTrans
14201422
}
14211423

14221424
private void testParquetDFSSource(boolean useSchemaProvider, List<String> transformerClassNames) throws Exception {
1425+
testParquetDFSSource(useSchemaProvider, transformerClassNames, false);
1426+
}
1427+
1428+
private void testParquetDFSSource(boolean useSchemaProvider, List<String> transformerClassNames, boolean testEmptyBatch) throws Exception {
14231429
prepareParquetDFSSource(useSchemaProvider, transformerClassNames != null);
14241430
String tableBasePath = dfsBasePath + "/test_parquet_table" + testNum;
14251431
HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
1426-
TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, ParquetDFSSource.class.getName(),
1432+
TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, testEmptyBatch ? TestParquetDFSSourceEmptyBatch.class.getName()
1433+
: ParquetDFSSource.class.getName(),
14271434
transformerClassNames, PROPS_FILENAME_TEST_PARQUET, false,
14281435
useSchemaProvider, 100000, false, null, null, "timestamp", null), jsc);
14291436
deltaStreamer.sync();
14301437
TestHelpers.assertRecordCount(PARQUET_NUM_RECORDS, tableBasePath + "/*/*.parquet", sqlContext);
14311438
testNum++;
1439+
1440+
if (testEmptyBatch) {
1441+
prepareParquetDFSFiles(100, PARQUET_SOURCE_ROOT, "2.parquet", false, null, null);
1442+
// parquet source to return empty batch
1443+
TestParquetDFSSourceEmptyBatch.returnEmptyBatch = true;
1444+
deltaStreamer.sync();
1445+
TestHelpers.assertRecordCount(PARQUET_NUM_RECORDS, tableBasePath + "/*/*.parquet", sqlContext);
1446+
1447+
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setBasePath(tableBasePath).setConf(jsc.hadoopConfiguration()).build();
1448+
1449+
HoodieInstant lastInstant = metaClient.reloadActiveTimeline().getCommitsTimeline().lastInstant().get();
1450+
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
1451+
.fromBytes(metaClient.getActiveTimeline().getInstantDetails(lastInstant).get(), HoodieCommitMetadata.class);
1452+
assertFalse(commitMetadata.getExtraMetadata().containsKey(HoodieCommitMetadata.SCHEMA_KEY));
1453+
1454+
// validate table schema fetches valid schema from last but one commit.
1455+
TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(metaClient);
1456+
assertNotEquals(tableSchemaResolver.getTableAvroSchema(), Schema.create(Schema.Type.NULL).toString());
1457+
}
14321458
}
14331459

14341460
private void testORCDFSSource(boolean useSchemaProvider, List<String> transformerClassNames) throws Exception {
@@ -1584,6 +1610,11 @@ public void testParquetDFSSourceWithoutSchemaProviderAndNoTransformer() throws E
15841610
testParquetDFSSource(false, null);
15851611
}
15861612

1613+
@Test
1614+
public void testParquetDFSSourceForEmptyBatch() throws Exception {
1615+
testParquetDFSSource(false, null, true);
1616+
}
1617+
15871618
@Test
15881619
public void testParquetDFSSourceWithoutSchemaProviderAndTransformer() throws Exception {
15891620
testParquetDFSSource(false, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.hudi.utilities.sources;
21+
22+
import org.apache.hudi.common.config.TypedProperties;
23+
import org.apache.hudi.common.util.Option;
24+
import org.apache.hudi.common.util.collection.Pair;
25+
import org.apache.hudi.utilities.schema.SchemaProvider;
26+
27+
import org.apache.spark.api.java.JavaSparkContext;
28+
import org.apache.spark.sql.Dataset;
29+
import org.apache.spark.sql.Row;
30+
import org.apache.spark.sql.SparkSession;
31+
32+
public class TestParquetDFSSourceEmptyBatch extends ParquetDFSSource {
33+
34+
public static boolean returnEmptyBatch;
35+
36+
public TestParquetDFSSourceEmptyBatch(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
37+
SchemaProvider schemaProvider) {
38+
super(props, sparkContext, sparkSession, schemaProvider);
39+
}
40+
41+
@Override
42+
public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkptStr, long sourceLimit) {
43+
Pair<Option<Dataset<Row>>, String> toReturn = super.fetchNextBatch(lastCkptStr, sourceLimit);
44+
if (returnEmptyBatch) {
45+
return Pair.of(Option.empty(), toReturn.getRight());
46+
}
47+
return toReturn;
48+
}
49+
}

0 commit comments

Comments
 (0)