Skip to content

Commit 3faddb7

Browse files
TJX2014xiaoxingstack
andauthored
[HUDI-4813] Fix infer keygen not work in sparksql side issue (apache#6634)
* [HUDI-4813] Fix infer keygen not work in sparksql side issue Co-authored-by: xiaoxingstack <xiaoxingstack@didiglobal.com>
1 parent f70678f commit 3faddb7

4 files changed

Lines changed: 86 additions & 9 deletions

File tree

hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import org.apache.hudi.common.config.{ConfigProperty, DFSPropertiesConfiguration
2323
import org.apache.hudi.common.fs.ConsistencyGuardConfig
2424
import org.apache.hudi.common.model.{HoodieTableType, WriteOperationType}
2525
import org.apache.hudi.common.table.HoodieTableConfig
26-
import org.apache.hudi.common.util.Option
26+
import org.apache.hudi.common.util.{Option, StringUtils}
2727
import org.apache.hudi.common.util.ValidationUtils.checkState
2828
import org.apache.hudi.config.{HoodieClusteringConfig, HoodieWriteConfig}
2929
import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncConfigHolder, HiveSyncTool}
@@ -787,9 +787,13 @@ object DataSourceOptionsHelper {
787787

788788
def inferKeyGenClazz(props: TypedProperties): String = {
789789
val partitionFields = props.getString(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(), null)
790-
if (partitionFields != null) {
790+
val recordsKeyFields = props.getString(DataSourceWriteOptions.RECORDKEY_FIELD.key(), DataSourceWriteOptions.RECORDKEY_FIELD.defaultValue())
791+
inferKeyGenClazz(recordsKeyFields, partitionFields)
792+
}
793+
794+
def inferKeyGenClazz(recordsKeyFields: String, partitionFields: String): String = {
795+
if (!StringUtils.isNullOrEmpty(partitionFields)) {
791796
val numPartFields = partitionFields.split(",").length
792-
val recordsKeyFields = props.getString(DataSourceWriteOptions.RECORDKEY_FIELD.key(), DataSourceWriteOptions.RECORDKEY_FIELD.defaultValue())
793797
val numRecordKeyFields = recordsKeyFields.split(",").length
794798
if (numPartFields == 1 && numRecordKeyFields == 1) {
795799
classOf[SimpleKeyGenerator].getName

hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,19 @@
1717

1818
package org.apache.spark.sql.catalyst.catalog
1919

20-
import org.apache.hudi.AvroConversionUtils
2120
import org.apache.hudi.DataSourceWriteOptions.OPERATION
2221
import org.apache.hudi.HoodieWriterUtils._
2322
import org.apache.hudi.common.config.DFSPropertiesConfiguration
2423
import org.apache.hudi.common.model.HoodieTableType
2524
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
2625
import org.apache.hudi.common.util.{StringUtils, ValidationUtils}
27-
import org.apache.hudi.keygen.ComplexKeyGenerator
2826
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
27+
import org.apache.hudi.{AvroConversionUtils, DataSourceOptionsHelper}
2928
import org.apache.spark.internal.Logging
3029
import org.apache.spark.sql.avro.SchemaConverters
3130
import org.apache.spark.sql.catalyst.TableIdentifier
3231
import org.apache.spark.sql.hudi.HoodieOptionConfig
32+
import org.apache.spark.sql.hudi.HoodieOptionConfig.SQL_KEY_TABLE_PRIMARY_KEY
3333
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._
3434
import org.apache.spark.sql.types.{StructField, StructType}
3535
import org.apache.spark.sql.{AnalysisException, SparkSession}
@@ -288,7 +288,10 @@ class HoodieCatalogTable(val spark: SparkSession, var table: CatalogTable) exten
288288
HoodieSparkKeyGeneratorFactory.convertToSparkKeyGenerator(
289289
originTableConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key))
290290
} else {
291-
extraConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) = classOf[ComplexKeyGenerator].getCanonicalName
291+
val primaryKeys = table.properties.get(SQL_KEY_TABLE_PRIMARY_KEY.sqlKeyName).getOrElse(SQL_KEY_TABLE_PRIMARY_KEY.defaultValue.get)
292+
val partitions = table.partitionColumnNames.mkString(",")
293+
extraConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) =
294+
DataSourceOptionsHelper.inferKeyGenClazz(primaryKeys, partitions)
292295
}
293296
extraConfig.toMap
294297
}

hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -874,7 +874,7 @@ class TestHoodieSparkSqlWriter {
874874
.setBasePath(tablePath1).build().getTableConfig
875875
assert(tableConfig1.getHiveStylePartitioningEnable == "true")
876876
assert(tableConfig1.getUrlEncodePartitioning == "false")
877-
assert(tableConfig1.getKeyGeneratorClassName == classOf[ComplexKeyGenerator].getName)
877+
assert(tableConfig1.getKeyGeneratorClassName == classOf[SimpleKeyGenerator].getName)
878878
df.write.format("hudi")
879879
.options(options)
880880
.option(HoodieWriteConfig.TBL_NAME.key, tableName1)

hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala

Lines changed: 72 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ import org.apache.spark.sql.SaveMode
2828
import org.apache.spark.sql.catalyst.TableIdentifier
2929
import org.apache.spark.sql.catalyst.catalog.CatalogTableType
3030
import org.apache.spark.sql.types._
31-
3231
import org.junit.jupiter.api.Assertions.assertFalse
3332

3433
import scala.collection.JavaConverters._
@@ -137,7 +136,7 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
137136
assertResult("dt")(tableConfig(HoodieTableConfig.PARTITION_FIELDS.key))
138137
assertResult("id")(tableConfig(HoodieTableConfig.RECORDKEY_FIELDS.key))
139138
assertResult("ts")(tableConfig(HoodieTableConfig.PRECOMBINE_FIELD.key))
140-
assertResult(classOf[ComplexKeyGenerator].getCanonicalName)(tableConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key))
139+
assertResult(classOf[SimpleKeyGenerator].getCanonicalName)(tableConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key))
141140
assertResult("default")(tableConfig(HoodieTableConfig.DATABASE_NAME.key()))
142141
assertResult(tableName)(tableConfig(HoodieTableConfig.NAME.key()))
143142
assertFalse(tableConfig.contains(OPERATION.key()))
@@ -944,4 +943,75 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
944943

945944
spark.sql("use default")
946945
}
946+
947+
test("Test Infer KegGenClazz") {
948+
def checkKeyGenerator(targetGenerator: String, tableName: String) = {
949+
val tablePath = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName)).location.getPath
950+
val metaClient = HoodieTableMetaClient.builder()
951+
.setBasePath(tablePath)
952+
.setConf(spark.sessionState.newHadoopConf())
953+
.build()
954+
val realKeyGenerator =
955+
metaClient.getTableConfig.getProps.asScala.toMap.get(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key).get
956+
assertResult(targetGenerator)(realKeyGenerator)
957+
}
958+
959+
val tableName = generateTableName
960+
961+
// Test Nonpartitioned table
962+
spark.sql(
963+
s"""
964+
| create table $tableName (
965+
| id int,
966+
| name string,
967+
| price double,
968+
| ts long
969+
| ) using hudi
970+
| comment "This is a simple hudi table"
971+
| tblproperties (
972+
| primaryKey = 'id',
973+
| preCombineField = 'ts'
974+
| )
975+
""".stripMargin)
976+
checkKeyGenerator("org.apache.hudi.keygen.NonpartitionedKeyGenerator", tableName)
977+
spark.sql(s"drop table $tableName")
978+
979+
// Test single partitioned table
980+
spark.sql(
981+
s"""
982+
| create table $tableName (
983+
| id int,
984+
| name string,
985+
| price double,
986+
| ts long
987+
| ) using hudi
988+
| comment "This is a simple hudi table"
989+
| partitioned by (ts)
990+
| tblproperties (
991+
| primaryKey = 'id',
992+
| preCombineField = 'ts'
993+
| )
994+
""".stripMargin)
995+
checkKeyGenerator("org.apache.hudi.keygen.SimpleKeyGenerator", tableName)
996+
spark.sql(s"drop table $tableName")
997+
998+
// Test single partitioned dual record keys table
999+
spark.sql(
1000+
s"""
1001+
| create table $tableName (
1002+
| id int,
1003+
| name string,
1004+
| price double,
1005+
| ts long
1006+
| ) using hudi
1007+
| comment "This is a simple hudi table"
1008+
| partitioned by (ts)
1009+
| tblproperties (
1010+
| primaryKey = 'id,name',
1011+
| preCombineField = 'ts'
1012+
| )
1013+
""".stripMargin)
1014+
checkKeyGenerator("org.apache.hudi.keygen.ComplexKeyGenerator", tableName)
1015+
spark.sql(s"drop table $tableName")
1016+
}
9471017
}

0 commit comments

Comments
 (0)