From 3fc2b55c7932364d89139478b17208bd644c2793 Mon Sep 17 00:00:00 2001 From: xiaoxingstack Date: Thu, 8 Sep 2022 16:42:42 +0800 Subject: [PATCH 1/4] [HUDI-4813] Fix infer keygen not work in sparksql side issue --- .../main/scala/org/apache/hudi/DataSourceOptions.scala | 6 +++++- .../spark/sql/catalyst/catalog/HoodieCatalogTable.scala | 9 ++++++--- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index c694174b8c79a..83092146c8ffb 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -787,9 +787,13 @@ object DataSourceOptionsHelper { def inferKeyGenClazz(props: TypedProperties): String = { val partitionFields = props.getString(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(), null) + val recordsKeyFields = props.getString(DataSourceWriteOptions.RECORDKEY_FIELD.key(), DataSourceWriteOptions.RECORDKEY_FIELD.defaultValue()) + genKeyGenerator(recordsKeyFields, partitionFields) + } + + def genKeyGenerator(recordsKeyFields: String, partitionFields: String): String = { if (partitionFields != null) { val numPartFields = partitionFields.split(",").length - val recordsKeyFields = props.getString(DataSourceWriteOptions.RECORDKEY_FIELD.key(), DataSourceWriteOptions.RECORDKEY_FIELD.defaultValue()) val numRecordKeyFields = recordsKeyFields.split(",").length if (numPartFields == 1 && numRecordKeyFields == 1) { classOf[SimpleKeyGenerator].getName diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala index 09981e845a108..0b9c04e481a04 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala @@ -17,19 +17,19 @@ package org.apache.spark.sql.catalyst.catalog -import org.apache.hudi.AvroConversionUtils import org.apache.hudi.DataSourceWriteOptions.OPERATION import org.apache.hudi.HoodieWriterUtils._ import org.apache.hudi.common.config.DFSPropertiesConfiguration import org.apache.hudi.common.model.HoodieTableType import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} import org.apache.hudi.common.util.{StringUtils, ValidationUtils} -import org.apache.hudi.keygen.ComplexKeyGenerator import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory +import org.apache.hudi.{AvroConversionUtils, DataSourceOptionsHelper} import org.apache.spark.internal.Logging import org.apache.spark.sql.avro.SchemaConverters import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.hudi.HoodieOptionConfig +import org.apache.spark.sql.hudi.HoodieOptionConfig.SQL_KEY_TABLE_PRIMARY_KEY import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._ import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.sql.{AnalysisException, SparkSession} @@ -288,7 +288,10 @@ class HoodieCatalogTable(val spark: SparkSession, var table: CatalogTable) exten HoodieSparkKeyGeneratorFactory.convertToSparkKeyGenerator( originTableConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key)) } else { - extraConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) = classOf[ComplexKeyGenerator].getCanonicalName + val primaryKeys = table.properties.get(SQL_KEY_TABLE_PRIMARY_KEY.sqlKeyName).get + val partitions = table.partitionColumnNames.mkString(",") + extraConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) = + DataSourceOptionsHelper.genKeyGenerator(primaryKeys, partitions) } extraConfig.toMap } From 7d339ae273daf057d4fd809b28bf1b9a31d5da8d Mon Sep 17 00:00:00 2001 From: xiaoxingstack Date: Fri, 9 Sep 2022 15:54:32 +0800 Subject: [PATCH 2/4] add test and NonPartitioned table fix --- .../org/apache/hudi/DataSourceOptions.scala | 8 +- .../catalyst/catalog/HoodieCatalogTable.scala | 2 +- .../spark/sql/hudi/TestCreateTable.scala | 74 ++++++++++++++++++- 3 files changed, 77 insertions(+), 7 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 83092146c8ffb..e8ffb09ff9100 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -23,7 +23,7 @@ import org.apache.hudi.common.config.{ConfigProperty, DFSPropertiesConfiguration import org.apache.hudi.common.fs.ConsistencyGuardConfig import org.apache.hudi.common.model.{HoodieTableType, WriteOperationType} import org.apache.hudi.common.table.HoodieTableConfig -import org.apache.hudi.common.util.Option +import org.apache.hudi.common.util.{Option, StringUtils} import org.apache.hudi.common.util.ValidationUtils.checkState import org.apache.hudi.config.{HoodieClusteringConfig, HoodieWriteConfig} import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncConfigHolder, HiveSyncTool} @@ -788,11 +788,11 @@ object DataSourceOptionsHelper { def inferKeyGenClazz(props: TypedProperties): String = { val partitionFields = props.getString(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(), null) val recordsKeyFields = props.getString(DataSourceWriteOptions.RECORDKEY_FIELD.key(), DataSourceWriteOptions.RECORDKEY_FIELD.defaultValue()) - genKeyGenerator(recordsKeyFields, partitionFields) + inferKeyGenClazz(recordsKeyFields, partitionFields) } - def genKeyGenerator(recordsKeyFields: String, partitionFields: String): String = { - if (partitionFields != null) { + def inferKeyGenClazz(recordsKeyFields: String, partitionFields: String): String = { + if (!StringUtils.isNullOrEmpty(partitionFields)) { val numPartFields = partitionFields.split(",").length val numRecordKeyFields = recordsKeyFields.split(",").length if (numPartFields == 1 && numRecordKeyFields == 1) { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala index 0b9c04e481a04..2cf6cc04fb73d 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala @@ -291,7 +291,7 @@ class HoodieCatalogTable(val spark: SparkSession, var table: CatalogTable) exten val primaryKeys = table.properties.get(SQL_KEY_TABLE_PRIMARY_KEY.sqlKeyName).get val partitions = table.partitionColumnNames.mkString(",") extraConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) = - DataSourceOptionsHelper.genKeyGenerator(primaryKeys, partitions) + DataSourceOptionsHelper.inferKeyGenClazz(primaryKeys, partitions) } extraConfig.toMap } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala index d3dbf9a6e6aab..029b3d1878dd3 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala @@ -28,8 +28,7 @@ import org.apache.spark.sql.SaveMode import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.CatalogTableType import org.apache.spark.sql.types._ - -import org.junit.jupiter.api.Assertions.assertFalse +import org.junit.jupiter.api.Assertions.{assertFalse, assertTrue} import scala.collection.JavaConverters._ @@ -944,4 +943,75 @@ class TestCreateTable extends HoodieSparkSqlTestBase { spark.sql("use default") } + + test("Test Infer KegGenClazz") { + def checkKeyGenerator(targetGenerator: String, tableName: String) = { + val tablePath = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName)).location.getPath + val metaClient = HoodieTableMetaClient.builder() + .setBasePath(tablePath) + .setConf(spark.sessionState.newHadoopConf()) + .build() + val realKeyGenerator = + metaClient.getTableConfig.getProps.asScala.toMap.get(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key).get + assertResult(targetGenerator)(realKeyGenerator) + } + + val tableName = generateTableName + + // Test Nonpartitioned table + spark.sql( + s""" + | create table $tableName ( + | id int, + | name string, + | price double, + | ts long + | ) using hudi + | comment "This is a simple hudi table" + | tblproperties ( + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) + checkKeyGenerator("org.apache.hudi.keygen.NonpartitionedKeyGenerator", tableName) + spark.sql(s"drop table $tableName") + + // Test single partitioned table + spark.sql( + s""" + | create table $tableName ( + | id int, + | name string, + | price double, + | ts long + | ) using hudi + | comment "This is a simple hudi table" + | partitioned by (ts) + | tblproperties ( + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) + checkKeyGenerator("org.apache.hudi.keygen.SimpleKeyGenerator", tableName) + spark.sql(s"drop table $tableName") + + // Test single partitioned dual record keys table + spark.sql( + s""" + | create table $tableName ( + | id int, + | name string, + | price double, + | ts long + | ) using hudi + | comment "This is a simple hudi table" + | partitioned by (ts) + | tblproperties ( + | primaryKey = 'id,name', + | preCombineField = 'ts' + | ) + """.stripMargin) + checkKeyGenerator("org.apache.hudi.keygen.ComplexKeyGenerator", tableName) + spark.sql(s"drop table $tableName") + } } From 5d136198f66dccf2a36f955196dd572846937f87 Mon Sep 17 00:00:00 2001 From: xiaoxingstack Date: Tue, 13 Sep 2022 11:14:23 +0800 Subject: [PATCH 3/4] fix default key missed for ut --- .../apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala | 2 +- .../test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala index 2cf6cc04fb73d..f1357723200d3 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala @@ -288,7 +288,7 @@ class HoodieCatalogTable(val spark: SparkSession, var table: CatalogTable) exten HoodieSparkKeyGeneratorFactory.convertToSparkKeyGenerator( originTableConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key)) } else { - val primaryKeys = table.properties.get(SQL_KEY_TABLE_PRIMARY_KEY.sqlKeyName).get + val primaryKeys = table.properties.get(SQL_KEY_TABLE_PRIMARY_KEY.sqlKeyName).getOrElse(SQL_KEY_TABLE_PRIMARY_KEY.defaultValue.get) val partitions = table.partitionColumnNames.mkString(",") extraConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) = DataSourceOptionsHelper.inferKeyGenClazz(primaryKeys, partitions) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala index 93469f2796cf9..d3640474b252c 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala @@ -872,7 +872,7 @@ class TestHoodieSparkSqlWriter { .setBasePath(tablePath1).build().getTableConfig assert(tableConfig1.getHiveStylePartitioningEnable == "true") assert(tableConfig1.getUrlEncodePartitioning == "false") - assert(tableConfig1.getKeyGeneratorClassName == classOf[ComplexKeyGenerator].getName) + assert(tableConfig1.getKeyGeneratorClassName == classOf[SimpleKeyGenerator].getName) df.write.format("hudi") .options(options) .option(HoodieWriteConfig.TBL_NAME.key, tableName1) From f32cf8515a4984c75bc26641abd3a5b042ebe372 Mon Sep 17 00:00:00 2001 From: xiaoxingstack Date: Wed, 14 Sep 2022 15:10:39 +0800 Subject: [PATCH 4/4] fix legacy test code not follow spark infer keygen clazz way issue --- .../scala/org/apache/spark/sql/hudi/TestCreateTable.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala index 029b3d1878dd3..6a6b41da7fb73 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.SaveMode import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.CatalogTableType import org.apache.spark.sql.types._ -import org.junit.jupiter.api.Assertions.{assertFalse, assertTrue} +import org.junit.jupiter.api.Assertions.assertFalse import scala.collection.JavaConverters._ @@ -136,7 +136,7 @@ class TestCreateTable extends HoodieSparkSqlTestBase { assertResult("dt")(tableConfig(HoodieTableConfig.PARTITION_FIELDS.key)) assertResult("id")(tableConfig(HoodieTableConfig.RECORDKEY_FIELDS.key)) assertResult("ts")(tableConfig(HoodieTableConfig.PRECOMBINE_FIELD.key)) - assertResult(classOf[ComplexKeyGenerator].getCanonicalName)(tableConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key)) + assertResult(classOf[SimpleKeyGenerator].getCanonicalName)(tableConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key)) assertResult("default")(tableConfig(HoodieTableConfig.DATABASE_NAME.key())) assertResult(tableName)(tableConfig(HoodieTableConfig.NAME.key())) assertFalse(tableConfig.contains(OPERATION.key()))