Skip to content

Commit a3921a8

Browse files
authored
[HUDI-3403] Ensure keygen props are set for bootstrap (#6645)
1 parent c22568e commit a3921a8

4 files changed

Lines changed: 43 additions & 31 deletions

File tree

hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -876,11 +876,10 @@ public PropertyBuilder setInflightMetadataPartitions(String partitions) {
876876
return this;
877877
}
878878

879-
public PropertyBuilder set(String key, Object value) {
879+
private void set(String key, Object value) {
880880
if (HoodieTableConfig.PERSISTED_CONFIG_LIST.contains(key)) {
881881
this.others.put(key, value);
882882
}
883-
return this;
884883
}
885884

886885
public PropertyBuilder set(Map<String, Object> props) {

hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ import org.apache.hudi.common.fs.FSUtils
3131
import org.apache.hudi.common.model._
3232
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
3333
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver}
34-
import org.apache.hudi.common.util.{CommitUtils, Functions, StringUtils}
35-
import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH, INDEX_CLASS_NAME}
34+
import org.apache.hudi.common.util.{CommitUtils, StringUtils}
35+
import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH, INDEX_CLASS_NAME, KEYGEN_CLASS_NAME}
3636
import org.apache.hudi.config.{HoodieInternalConfig, HoodieWriteConfig}
3737
import org.apache.hudi.exception.HoodieException
3838
import org.apache.hudi.execution.bulkinsert.{BulkInsertInternalPartitionerWithRowsFactory, NonSortPartitionerWithRows}
@@ -469,7 +469,10 @@ object HoodieSparkSqlWriter {
469469
val archiveLogFolder = hoodieConfig.getStringOrDefault(HoodieTableConfig.ARCHIVELOG_FOLDER)
470470
val partitionColumns = HoodieWriterUtils.getPartitionColumns(parameters)
471471
val recordKeyFields = hoodieConfig.getString(DataSourceWriteOptions.RECORDKEY_FIELD)
472-
val keyGenProp = hoodieConfig.getString(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME)
472+
val keyGenProp =
473+
if (StringUtils.nonEmpty(hoodieConfig.getString(KEYGEN_CLASS_NAME))) hoodieConfig.getString(KEYGEN_CLASS_NAME)
474+
else hoodieConfig.getString(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME)
475+
val timestampKeyGeneratorConfigs = extractConfigsRelatedToTimestampBasedKeyGenerator(keyGenProp, parameters)
473476
val populateMetaFields = java.lang.Boolean.parseBoolean(parameters.getOrElse(
474477
HoodieTableConfig.POPULATE_META_FIELDS.key(),
475478
String.valueOf(HoodieTableConfig.POPULATE_META_FIELDS.defaultValue())
@@ -493,6 +496,7 @@ object HoodieSparkSqlWriter {
493496
.setPartitionFields(partitionColumns)
494497
.setPopulateMetaFields(populateMetaFields)
495498
.setKeyGeneratorClassProp(keyGenProp)
499+
.set(timestampKeyGeneratorConfigs)
496500
.setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HIVE_STYLE_PARTITIONING))
497501
.setUrlEncodePartitioning(hoodieConfig.getBoolean(URL_ENCODE_PARTITIONING))
498502
.setCommitTimezone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_TIMEZONE)))

hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,6 @@
1717

1818
package org.apache.hudi
1919

20-
import java.io.IOException
21-
import java.time.Instant
22-
import java.util.{Collections, Date, UUID}
2320
import org.apache.commons.io.FileUtils
2421
import org.apache.hudi.DataSourceWriteOptions._
2522
import org.apache.hudi.HoodieSparkUtils.gteqSpark3_0
@@ -43,12 +40,15 @@ import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue,
4340
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
4441
import org.junit.jupiter.params.ParameterizedTest
4542
import org.junit.jupiter.params.provider.Arguments.arguments
46-
import org.junit.jupiter.params.provider.{Arguments, CsvSource, EnumSource, MethodSource, ValueSource}
43+
import org.junit.jupiter.params.provider.{Arguments, EnumSource, MethodSource, ValueSource}
4744
import org.mockito.ArgumentMatchers.any
4845
import org.mockito.Mockito.{spy, times, verify}
4946
import org.scalatest.Assertions.assertThrows
5047
import org.scalatest.Matchers.{be, convertToAnyShouldWrapper, intercept}
5148

49+
import java.io.IOException
50+
import java.time.Instant
51+
import java.util.{Collections, Date, UUID}
5252
import scala.collection.JavaConversions._
5353
import scala.collection.JavaConverters
5454

@@ -508,7 +508,7 @@ class TestHoodieSparkSqlWriter {
508508
val records = DataSourceTestUtils.generateRandomRows(100)
509509
val recordsSeq = convertRowListToSeq(records)
510510
val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
511-
initializeMetaClientForBootstrap(fooTableParams, tableType, addBootstrapPath = false)
511+
initializeMetaClientForBootstrap(fooTableParams, tableType, addBootstrapPath = false, initBasePath = true)
512512
val client = spy(DataSourceUtils.createHoodieClient(
513513
new JavaSparkContext(sc), modifiedSchema.toString, tempBasePath, hoodieFooTableName,
514514
mapAsJavaMap(fooTableParams)).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]])
@@ -565,7 +565,7 @@ class TestHoodieSparkSqlWriter {
565565
DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
566566
HoodieBootstrapConfig.KEYGEN_CLASS_NAME.key -> classOf[NonpartitionedKeyGenerator].getCanonicalName)
567567
val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier)
568-
initializeMetaClientForBootstrap(fooTableParams, tableType, addBootstrapPath = true)
568+
initializeMetaClientForBootstrap(fooTableParams, tableType, addBootstrapPath = true, initBasePath = false)
569569

570570
val client = spy(DataSourceUtils.createHoodieClient(
571571
new JavaSparkContext(sc),
@@ -593,7 +593,7 @@ class TestHoodieSparkSqlWriter {
593593
}
594594
}
595595

596-
def initializeMetaClientForBootstrap(fooTableParams : Map[String, String], tableType: String, addBootstrapPath : Boolean) : Unit = {
596+
def initializeMetaClientForBootstrap(fooTableParams : Map[String, String], tableType: String, addBootstrapPath : Boolean, initBasePath: Boolean) : Unit = {
597597
// when metadata is enabled, directly instantiating write client using DataSourceUtils.createHoodieClient
598598
// will hit a code which tries to instantiate meta client for data table. if table does not exist, it fails.
599599
// hence doing an explicit instantiation here.
@@ -612,7 +612,9 @@ class TestHoodieSparkSqlWriter {
612612
tableMetaClientBuilder
613613
.setBootstrapBasePath(fooTableParams(HoodieBootstrapConfig.BASE_PATH.key))
614614
}
615-
tableMetaClientBuilder.initTable(sc.hadoopConfiguration, tempBasePath)
615+
if (initBasePath) {
616+
tableMetaClientBuilder.initTable(sc.hadoopConfiguration, tempBasePath)
617+
}
616618
}
617619

618620
/**

hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,17 @@ import org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector
2323
import org.apache.hudi.common.fs.FSUtils
2424
import org.apache.hudi.common.table.timeline.HoodieTimeline
2525
import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieCompactionConfig, HoodieWriteConfig}
26-
import org.apache.hudi.keygen.SimpleKeyGenerator
26+
import org.apache.hudi.keygen.{NonpartitionedKeyGenerator, SimpleKeyGenerator}
2727
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
2828
import org.apache.spark.api.java.JavaSparkContext
2929
import org.apache.spark.sql.functions.{col, lit}
3030
import org.apache.spark.sql.{SaveMode, SparkSession}
3131
import org.junit.jupiter.api.Assertions.assertEquals
3232
import org.junit.jupiter.api.io.TempDir
3333
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
34+
3435
import java.time.Instant
3536
import java.util.Collections
36-
3737
import scala.collection.JavaConverters._
3838

3939
class TestDataSourceForBootstrap {
@@ -102,9 +102,12 @@ class TestDataSourceForBootstrap {
102102
.save(srcPath)
103103

104104
// Perform bootstrap
105+
val bootstrapKeygenClass = classOf[NonpartitionedKeyGenerator].getName
106+
val options = commonOpts.-(DataSourceWriteOptions.PARTITIONPATH_FIELD.key)
105107
val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
106108
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
107-
extraOpts = Map(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> "org.apache.hudi.keygen.NonpartitionedKeyGenerator")
109+
extraOpts = options ++ Map(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> bootstrapKeygenClass),
110+
bootstrapKeygenClass = bootstrapKeygenClass
108111
)
109112
// check marked directory clean up
110113
assert(!fs.exists(new Path(basePath, ".hoodie/.temp/00000000000001")))
@@ -123,10 +126,10 @@ class TestDataSourceForBootstrap {
123126

124127
updateDF.write
125128
.format("hudi")
126-
.options(commonOpts)
129+
.options(options)
127130
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
128131
.option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
129-
.option(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key, "org.apache.hudi.keygen.NonpartitionedKeyGenerator")
132+
.option(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key, bootstrapKeygenClass)
130133
.mode(SaveMode.Append)
131134
.save(basePath)
132135

@@ -163,8 +166,8 @@ class TestDataSourceForBootstrap {
163166
// Perform bootstrap
164167
val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
165168
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
166-
Some("datestr"),
167-
Map(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key -> "true"))
169+
commonOpts.updated(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "datestr") ++ Map(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key -> "true"),
170+
classOf[SimpleKeyGenerator].getName)
168171

169172
// check marked directory clean up
170173
assert(!fs.exists(new Path(basePath, ".hoodie/.temp/00000000000001")))
@@ -227,7 +230,9 @@ class TestDataSourceForBootstrap {
227230

228231
// Perform bootstrap
229232
val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
230-
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, Some("datestr"))
233+
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
234+
commonOpts.updated(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "datestr"),
235+
classOf[SimpleKeyGenerator].getName)
231236

232237
// Read bootstrapped table and verify count using glob path
233238
val hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*")
@@ -302,7 +307,9 @@ class TestDataSourceForBootstrap {
302307

303308
// Perform bootstrap
304309
val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
305-
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, Some("datestr"))
310+
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL,
311+
commonOpts.updated(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "datestr"),
312+
classOf[SimpleKeyGenerator].getName)
306313

307314
// Read bootstrapped table and verify count
308315
val hoodieROViewDF1 = spark.read.format("hudi")
@@ -367,7 +374,9 @@ class TestDataSourceForBootstrap {
367374

368375
// Perform bootstrap
369376
val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
370-
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, Some("datestr"))
377+
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL,
378+
commonOpts.updated(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "datestr"),
379+
classOf[SimpleKeyGenerator].getName)
371380

372381
// Read bootstrapped table and verify count
373382
val hoodieROViewDF1 = spark.read.format("hudi")
@@ -497,18 +506,16 @@ class TestDataSourceForBootstrap {
497506
}
498507

499508
def runMetadataBootstrapAndVerifyCommit(tableType: String,
500-
partitionColumns: Option[String] = None,
501-
extraOpts: Map[String, String] = Map.empty): String = {
509+
extraOpts: Map[String, String] = Map.empty,
510+
bootstrapKeygenClass: String): String = {
502511
val bootstrapDF = spark.emptyDataFrame
503512
bootstrapDF.write
504513
.format("hudi")
505-
.options(commonOpts)
506514
.options(extraOpts)
507515
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL)
508516
.option(DataSourceWriteOptions.TABLE_TYPE.key, tableType)
509-
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, partitionColumns.getOrElse(""))
510517
.option(HoodieBootstrapConfig.BASE_PATH.key, srcPath)
511-
.option(HoodieBootstrapConfig.KEYGEN_CLASS_NAME.key, classOf[SimpleKeyGenerator].getName)
518+
.option(HoodieBootstrapConfig.KEYGEN_CLASS_NAME.key, bootstrapKeygenClass)
512519
.mode(SaveMode.Overwrite)
513520
.save(basePath)
514521

@@ -528,7 +535,7 @@ class TestDataSourceForBootstrap {
528535
.load(basePath)
529536

530537
assertEquals(numRecords, hoodieIncViewDF1.count())
531-
var countsPerCommit = hoodieIncViewDF1.groupBy("_hoodie_commit_time").count().collect();
538+
var countsPerCommit = hoodieIncViewDF1.groupBy("_hoodie_commit_time").count().collect()
532539
assertEquals(1, countsPerCommit.length)
533540
assertEquals(bootstrapCommitInstantTime, countsPerCommit(0).get(0))
534541

@@ -537,10 +544,10 @@ class TestDataSourceForBootstrap {
537544
val hoodieIncViewDF2 = spark.read.format("hudi")
538545
.option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
539546
.option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, bootstrapCommitInstantTime)
540-
.load(basePath);
547+
.load(basePath)
541548

542549
assertEquals(numRecordsUpdate, hoodieIncViewDF2.count())
543-
countsPerCommit = hoodieIncViewDF2.groupBy("_hoodie_commit_time").count().collect();
550+
countsPerCommit = hoodieIncViewDF2.groupBy("_hoodie_commit_time").count().collect()
544551
assertEquals(1, countsPerCommit.length)
545552
assertEquals(latestCommitInstantTime, countsPerCommit(0).get(0))
546553

0 commit comments

Comments
 (0)