diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java index 9ea61f1d3669..56862be8034e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java @@ -518,9 +518,13 @@ public Option getPartitionFields() { /** * @returns the partition field prop. + * @deprecated please use {@link #getPartitionFields()} instead */ + @Deprecated public String getPartitionFieldProp() { - return getString(PARTITION_FIELDS); + // NOTE: We're adding a stub returning empty string to stay compatible w/ pre-existing + // behavior until this method is fully deprecated + return Option.ofNullable(getString(PARTITION_FIELDS)).orElse(""); } /** diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 92c8f2a23609..57cab09377fa 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -31,6 +31,7 @@ import org.apache.hudi.keygen.constant.KeyGeneratorOptions import org.apache.hudi.keygen.{ComplexKeyGenerator, CustomKeyGenerator, NonpartitionedKeyGenerator, SimpleKeyGenerator} import org.apache.hudi.sync.common.HoodieSyncConfig import org.apache.hudi.sync.common.util.ConfigUtils +import org.apache.hudi.util.JFunction import org.apache.log4j.LogManager import org.apache.spark.sql.execution.datasources.{DataSourceUtils => SparkDataSourceUtils} @@ -331,14 +332,14 @@ object DataSourceWriteOptions { /** * Key generator class, that implements will extract the key out of incoming record. */ - val keyGeneraterInferFunc = DataSourceOptionsHelper.scalaFunctionToJavaFunction((p: HoodieConfig) => { - Option.of(DataSourceOptionsHelper.inferKeyGenClazz(p.getProps)) + val keyGeneratorInferFunc = JFunction.toJavaFunction((config: HoodieConfig) => { + Option.of(DataSourceOptionsHelper.inferKeyGenClazz(config.getProps)) }) val KEYGENERATOR_CLASS_NAME: ConfigProperty[String] = ConfigProperty .key("hoodie.datasource.write.keygenerator.class") .defaultValue(classOf[SimpleKeyGenerator].getName) - .withInferFunction(keyGeneraterInferFunc) + .withInferFunction(keyGeneratorInferFunc) .withDocumentation("Key generator class, that implements `org.apache.hudi.keygen.KeyGenerator`") val KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED: ConfigProperty[String] = KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED @@ -811,12 +812,6 @@ object DataSourceOptionsHelper { } } - implicit def scalaFunctionToJavaFunction[From, To](function: (From) => To): JavaFunction[From, To] = { - new JavaFunction[From, To] { - override def apply (input: From): To = function (input) - } - } - implicit def convert[T, U](prop: ConfigProperty[T])(implicit converter: T => U): ConfigProperty[U] = { checkState(prop.hasDefaultValue) var newProp: ConfigProperty[U] = ConfigProperty.key(prop.key()) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala index aa29bad6b03d..335fe68bd209 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala @@ -112,7 +112,7 @@ object HoodieWriterUtils { def getOriginKeyGenerator(parameters: Map[String, String]): String = { val kg = parameters.getOrElse(KEYGENERATOR_CLASS_NAME.key(), null) if (classOf[SqlKeyGenerator].getCanonicalName == kg) { - parameters.getOrElse(SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME, null) + parameters.getOrElse(SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME, null) } else { kg } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala index f1357723200d..c31cd3b20565 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala @@ -190,12 +190,18 @@ class HoodieCatalogTable(val spark: SparkSession, var table: CatalogTable) exten } else { val (recordName, namespace) = AvroConversionUtils.getAvroRecordNameAndNamespace(table.identifier.table) val schema = SchemaConverters.toAvroType(finalSchema, false, recordName, namespace) + val partitionColumns = if (table.partitionColumnNames.isEmpty) { + null + } else { + table.partitionColumnNames.mkString(",") + } + HoodieTableMetaClient.withPropertyBuilder() .fromProperties(properties) .setDatabaseName(catalogDatabaseName) .setTableName(table.identifier.table) .setTableCreateSchema(schema.toString()) - .setPartitionFields(table.partitionColumnNames.mkString(",")) + .setPartitionFields(partitionColumns) .initTable(hadoopConf, tableLocation) } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala index f3f7e6649059..332455ea217e 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala @@ -69,7 +69,7 @@ trait ProvidesHoodieConfig extends Logging { HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable, URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning, KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName, - SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName, + SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName, OPERATION.key -> UPSERT_OPERATION_OPT_VAL, PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp, HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key), @@ -180,7 +180,7 @@ trait ProvidesHoodieConfig extends Logging { HIVE_STYLE_PARTITIONING.key -> hiveStylePartitioningEnable, URL_ENCODE_PARTITIONING.key -> urlEncodePartitioning, KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName, - SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME -> keyGeneratorClassName, + SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME -> keyGeneratorClassName, RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","), PRECOMBINE_FIELD.key -> preCombineField, PARTITIONPATH_FIELD.key -> partitionFieldsStr, @@ -264,7 +264,7 @@ trait ProvidesHoodieConfig extends Logging { HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable, URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning, KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName, - SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName, + SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName, OPERATION.key -> DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL, PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp, HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key), diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala index 798ed84b0939..01c995fed437 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala @@ -18,24 +18,28 @@ package org.apache.spark.sql.hudi.command import org.apache.avro.generic.GenericRecord -import org.apache.hudi.DataSourceOptionsHelper import org.apache.hudi.common.config.TypedProperties +import org.apache.hudi.common.model.HoodieKey import org.apache.hudi.common.util.PartitionPathEncodeUtils +import org.apache.hudi.common.util.ValidationUtils.checkArgument import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.keygen._ import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.types.{StructType, TimestampType} +import org.apache.spark.unsafe.types.UTF8String import org.joda.time.format.DateTimeFormat import java.sql.Timestamp +import java.util +import java.util.Collections import java.util.concurrent.TimeUnit.{MICROSECONDS, MILLISECONDS} /** - * A complex key generator for sql command which do some process for the - * timestamp data type partition field. + * Custom Spark-specific [[KeyGenerator]] overriding behavior handling [[TimestampType]] partition values */ -class SqlKeyGenerator(props: TypedProperties) extends ComplexKeyGenerator(props) { +class SqlKeyGenerator(props: TypedProperties) extends BuiltinKeyGenerator(props) { private lazy val partitionSchema = { val partitionSchema = props.getString(SqlKeyGenerator.PARTITION_SCHEMA, "") @@ -45,37 +49,91 @@ class SqlKeyGenerator(props: TypedProperties) extends ComplexKeyGenerator(props) None } } - // The origin key generator class for this table. - private lazy val originKeyGen = { - val beforeKeyGenClassName = props.getString(SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME, null) - if (beforeKeyGenClassName != null && beforeKeyGenClassName.nonEmpty) { - val keyGenProps = new TypedProperties() - keyGenProps.putAll(props) - keyGenProps.remove(SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME) - val convertedKeyGenClassName = SqlKeyGenerator.getRealKeyGenClassName(props) - keyGenProps.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key, convertedKeyGenClassName) - Some(KeyGenUtils.createKeyGeneratorByClassName(keyGenProps)) - } else { - None + + private lazy val complexKeyGen = new ComplexKeyGenerator(props) + private lazy val originalKeyGen = + Option(props.getString(SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME, null)) + .map { originalKeyGenClassName => + checkArgument(originalKeyGenClassName.nonEmpty) + + val convertedKeyGenClassName = HoodieSparkKeyGeneratorFactory.convertToSparkKeyGenerator(originalKeyGenClassName) + + val keyGenProps = new TypedProperties() + keyGenProps.putAll(props) + keyGenProps.remove(SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME) + keyGenProps.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key, convertedKeyGenClassName) + + KeyGenUtils.createKeyGeneratorByClassName(keyGenProps).asInstanceOf[SparkKeyGeneratorInterface] + } + + override def getRecordKey(record: GenericRecord): String = + originalKeyGen.map { + _.getKey(record).getRecordKey + } getOrElse { + complexKeyGen.getRecordKey(record) } + + override def getRecordKey(row: Row): String = + originalKeyGen.map { + _.getRecordKey(row) + } getOrElse { + complexKeyGen.getRecordKey(row) + } + + + override def getRecordKey(internalRow: InternalRow, schema: StructType): UTF8String = + originalKeyGen.map { + _.getRecordKey(internalRow, schema) + } getOrElse { + complexKeyGen.getRecordKey(internalRow, schema) + } + + override def getPartitionPath(record: GenericRecord): String = { + val partitionPath = originalKeyGen.map { + _.getKey(record).getPartitionPath + } getOrElse { + complexKeyGen.getPartitionPath(record) + } + + convertPartitionPathToSqlType(partitionPath, rowType = false) } - override def getRecordKey(record: GenericRecord): String = { - if (originKeyGen.isDefined) { - originKeyGen.get.getKey(record).getRecordKey - } else { - super.getRecordKey(record) + override def getPartitionPath(row: Row): String = { + val partitionPath = originalKeyGen.map { + _.getPartitionPath(row) + } getOrElse { + complexKeyGen.getPartitionPath(row) } + + convertPartitionPathToSqlType(partitionPath, rowType = true) } - override def getRecordKey(row: Row): String = { - if (originKeyGen.isDefined) { - originKeyGen.get.asInstanceOf[SparkKeyGeneratorInterface].getRecordKey(row) - } else { - super.getRecordKey(row) + override def getPartitionPath(internalRow: InternalRow, schema: StructType): UTF8String = { + val partitionPath = originalKeyGen.map { + _.getPartitionPath(internalRow, schema) + } getOrElse { + complexKeyGen.getPartitionPath(internalRow, schema) + } + + UTF8String.fromString(convertPartitionPathToSqlType(partitionPath.toString, rowType = true)) + } + + override def getRecordKeyFieldNames: util.List[String] = { + originalKeyGen.map(_.getRecordKeyFieldNames) + .getOrElse(complexKeyGen.getRecordKeyFieldNames) + } + + override def getPartitionPathFields: util.List[String] = { + originalKeyGen.map { + case bkg: BaseKeyGenerator => bkg.getPartitionPathFields + case _ => + Option(super.getPartitionPathFields).getOrElse(Collections.emptyList[String]) + } getOrElse { + complexKeyGen.getPartitionPathFields } } + // TODO clean up private def convertPartitionPathToSqlType(partitionPath: String, rowType: Boolean): String = { if (partitionSchema.isDefined) { // we can split the partitionPath here because we enable the URL_ENCODE_PARTITIONING_OPT @@ -113,30 +171,11 @@ class SqlKeyGenerator(props: TypedProperties) extends ComplexKeyGenerator(props) } } else partitionPath } - - override def getPartitionPath(record: GenericRecord): String = { - val partitionPath = super.getPartitionPath(record) - convertPartitionPathToSqlType(partitionPath, rowType = false) - } - - override def getPartitionPath(row: Row): String = { - val partitionPath = super.getPartitionPath(row) - convertPartitionPathToSqlType(partitionPath, rowType = true) - } } object SqlKeyGenerator { val PARTITION_SCHEMA = "hoodie.sql.partition.schema" - val ORIGIN_KEYGEN_CLASS_NAME = "hoodie.sql.origin.keygen.class" + val ORIGINAL_KEYGEN_CLASS_NAME = "hoodie.sql.origin.keygen.class" private val timestampTimeFormat = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss") private val sqlTimestampFormat = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.S") - - def getRealKeyGenClassName(props: TypedProperties): String = { - val beforeKeyGenClassName = props.getString(SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME, null) - if (beforeKeyGenClassName != null && beforeKeyGenClassName.nonEmpty) { - HoodieSparkKeyGeneratorFactory.convertToSparkKeyGenerator(beforeKeyGenClassName) - } else { - DataSourceOptionsHelper.inferKeyGenClazz(props) - } - } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala index 17ff34c909be..2761a002055a 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala @@ -534,7 +534,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable, URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning, KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName, - SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName, + SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName, HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key), HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key), HiveSyncConfigHolder.HIVE_SYNC_MODE.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_MODE), diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala index 60d870a05fd7..4e4fe43ff94b 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala @@ -1064,7 +1064,7 @@ class TestHoodieSparkSqlWriter { // for sql write val m2 = Map( HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getName, - SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME -> classOf[SimpleKeyGenerator].getName + SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME -> classOf[SimpleKeyGenerator].getName ) val kg2 = HoodieWriterUtils.getOriginKeyGenerator(m2) assertTrue(kg2 == classOf[SimpleKeyGenerator].getName) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRepairsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRepairsProcedure.scala index f6ce92b41586..86dbae5b1e81 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRepairsProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRepairsProcedure.scala @@ -29,6 +29,7 @@ import org.apache.hudi.common.table.view.HoodieTableFileSystemView import org.apache.hudi.common.testutils.{HoodieTestDataGenerator, SchemaTestUtil} import org.apache.hudi.testutils.HoodieSparkWriteableTestTable import org.apache.spark.api.java.JavaSparkContext +import org.junit.jupiter.api.Assertions.assertEquals import java.io.IOException import java.net.URL @@ -109,10 +110,36 @@ class TestRepairsProcedure extends HoodieSparkProcedureTestBase { val newProps: URL = this.getClass.getClassLoader.getResource("table-config.properties") // overwrite hoodie props - val Result = spark.sql(s"""call repair_overwrite_hoodie_props(table => '$tableName', new_props_file_path => '${newProps.getPath}')""").collect() - assertResult(15) { - Result.length - } + val expectedOutput =""" + |[hoodie.archivelog.folder,archived,archive] + |[hoodie.database.name,default,null] + |[hoodie.datasource.write.drop.partition.columns,false,false] + |[hoodie.datasource.write.hive_style_partitioning,true,null] + |[hoodie.datasource.write.partitionpath.urlencode,false,null] + |[hoodie.table.checksum,,] + |[hoodie.table.create.schema,,] + |[hoodie.table.keygenerator.class,org.apache.hudi.keygen.NonpartitionedKeyGenerator,null] + |[hoodie.table.name,,] + |[hoodie.table.precombine.field,ts,null] + |[hoodie.table.recordkey.fields,id,null] + |[hoodie.table.type,COPY_ON_WRITE,COPY_ON_WRITE] + |[hoodie.table.version,,] + |[hoodie.timeline.layout.version,,]""".stripMargin.trim + + val actual = spark.sql(s"""call repair_overwrite_hoodie_props(table => '$tableName', new_props_file_path => '${newProps.getPath}')""") + .collect() + .map { + // omit these properties with variant values + case row if row.getString(0).equals("hoodie.table.checksum") => "[hoodie.table.checksum,,]" + case row if row.getString(0).equals("hoodie.table.create.schema") => "[hoodie.table.create.schema,,]" + case row if row.getString(0).equals("hoodie.table.name") => "[hoodie.table.name,,]" + case row if row.getString(0).equals("hoodie.table.version") => "[hoodie.table.version,,]" + case row if row.getString(0).equals("hoodie.timeline.layout.version") => "[hoodie.timeline.layout.version,,]" + case o => o.toString() + } + .mkString("\n") + + assertEquals(expectedOutput, actual) } }