Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -518,9 +518,13 @@ public Option<String[]> getPartitionFields() {

/**
* @returns the partition field prop.
* @deprecated please use {@link #getPartitionFields()} instead
*/
@Deprecated
public String getPartitionFieldProp() {
return getString(PARTITION_FIELDS);
// NOTE: We're adding a stub returning empty string to stay compatible w/ pre-existing
// behavior until this method is fully deprecated
return Option.ofNullable(getString(PARTITION_FIELDS)).orElse("");
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.hudi.keygen.constant.KeyGeneratorOptions
import org.apache.hudi.keygen.{ComplexKeyGenerator, CustomKeyGenerator, NonpartitionedKeyGenerator, SimpleKeyGenerator}
import org.apache.hudi.sync.common.HoodieSyncConfig
import org.apache.hudi.sync.common.util.ConfigUtils
import org.apache.hudi.util.JFunction
import org.apache.log4j.LogManager
import org.apache.spark.sql.execution.datasources.{DataSourceUtils => SparkDataSourceUtils}

Expand Down Expand Up @@ -331,14 +332,14 @@ object DataSourceWriteOptions {
/**
* Key generator class, that implements will extract the key out of incoming record.
*/
val keyGeneraterInferFunc = DataSourceOptionsHelper.scalaFunctionToJavaFunction((p: HoodieConfig) => {
Option.of(DataSourceOptionsHelper.inferKeyGenClazz(p.getProps))
val keyGeneratorInferFunc = JFunction.toJavaFunction((config: HoodieConfig) => {
Option.of(DataSourceOptionsHelper.inferKeyGenClazz(config.getProps))
})

val KEYGENERATOR_CLASS_NAME: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.write.keygenerator.class")
.defaultValue(classOf[SimpleKeyGenerator].getName)
.withInferFunction(keyGeneraterInferFunc)
.withInferFunction(keyGeneratorInferFunc)
.withDocumentation("Key generator class, that implements `org.apache.hudi.keygen.KeyGenerator`")

val KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED: ConfigProperty[String] = KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED
Expand Down Expand Up @@ -811,12 +812,6 @@ object DataSourceOptionsHelper {
}
}

implicit def scalaFunctionToJavaFunction[From, To](function: (From) => To): JavaFunction[From, To] = {
new JavaFunction[From, To] {
override def apply (input: From): To = function (input)
}
}

implicit def convert[T, U](prop: ConfigProperty[T])(implicit converter: T => U): ConfigProperty[U] = {
checkState(prop.hasDefaultValue)
var newProp: ConfigProperty[U] = ConfigProperty.key(prop.key())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ object HoodieWriterUtils {
def getOriginKeyGenerator(parameters: Map[String, String]): String = {
val kg = parameters.getOrElse(KEYGENERATOR_CLASS_NAME.key(), null)
if (classOf[SqlKeyGenerator].getCanonicalName == kg) {
parameters.getOrElse(SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME, null)
parameters.getOrElse(SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME, null)
} else {
kg
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,12 +190,18 @@ class HoodieCatalogTable(val spark: SparkSession, var table: CatalogTable) exten
} else {
val (recordName, namespace) = AvroConversionUtils.getAvroRecordNameAndNamespace(table.identifier.table)
val schema = SchemaConverters.toAvroType(finalSchema, false, recordName, namespace)
val partitionColumns = if (table.partitionColumnNames.isEmpty) {
null
} else {
table.partitionColumnNames.mkString(",")
}

HoodieTableMetaClient.withPropertyBuilder()
.fromProperties(properties)
.setDatabaseName(catalogDatabaseName)
.setTableName(table.identifier.table)
.setTableCreateSchema(schema.toString())
.setPartitionFields(table.partitionColumnNames.mkString(","))
.setPartitionFields(partitionColumns)
.initTable(hadoopConf, tableLocation)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ trait ProvidesHoodieConfig extends Logging {
HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable,
URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning,
KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName,
SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName,
OPERATION.key -> UPSERT_OPERATION_OPT_VAL,
PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp,
HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key),
Expand Down Expand Up @@ -180,7 +180,7 @@ trait ProvidesHoodieConfig extends Logging {
HIVE_STYLE_PARTITIONING.key -> hiveStylePartitioningEnable,
URL_ENCODE_PARTITIONING.key -> urlEncodePartitioning,
KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME -> keyGeneratorClassName,
SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME -> keyGeneratorClassName,
RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
PRECOMBINE_FIELD.key -> preCombineField,
PARTITIONPATH_FIELD.key -> partitionFieldsStr,
Expand Down Expand Up @@ -264,7 +264,7 @@ trait ProvidesHoodieConfig extends Logging {
HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable,
URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning,
KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName,
SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName,
OPERATION.key -> DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL,
PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp,
HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,28 @@
package org.apache.spark.sql.hudi.command

import org.apache.avro.generic.GenericRecord
import org.apache.hudi.DataSourceOptionsHelper
import org.apache.hudi.common.config.TypedProperties
import org.apache.hudi.common.model.HoodieKey
import org.apache.hudi.common.util.PartitionPathEncodeUtils
import org.apache.hudi.common.util.ValidationUtils.checkArgument
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.keygen._
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types.{StructType, TimestampType}
import org.apache.spark.unsafe.types.UTF8String
import org.joda.time.format.DateTimeFormat

import java.sql.Timestamp
import java.util
import java.util.Collections
import java.util.concurrent.TimeUnit.{MICROSECONDS, MILLISECONDS}

/**
* A complex key generator for sql command which do some process for the
* timestamp data type partition field.
* Custom Spark-specific [[KeyGenerator]] overriding behavior handling [[TimestampType]] partition values
*/
class SqlKeyGenerator(props: TypedProperties) extends ComplexKeyGenerator(props) {
class SqlKeyGenerator(props: TypedProperties) extends BuiltinKeyGenerator(props) {

private lazy val partitionSchema = {
val partitionSchema = props.getString(SqlKeyGenerator.PARTITION_SCHEMA, "")
Expand All @@ -45,37 +49,91 @@ class SqlKeyGenerator(props: TypedProperties) extends ComplexKeyGenerator(props)
None
}
}
// The origin key generator class for this table.
private lazy val originKeyGen = {
val beforeKeyGenClassName = props.getString(SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME, null)
if (beforeKeyGenClassName != null && beforeKeyGenClassName.nonEmpty) {
val keyGenProps = new TypedProperties()
keyGenProps.putAll(props)
keyGenProps.remove(SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME)
val convertedKeyGenClassName = SqlKeyGenerator.getRealKeyGenClassName(props)
keyGenProps.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key, convertedKeyGenClassName)
Some(KeyGenUtils.createKeyGeneratorByClassName(keyGenProps))
} else {
None

private lazy val complexKeyGen = new ComplexKeyGenerator(props)
private lazy val originalKeyGen =
Option(props.getString(SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME, null))
.map { originalKeyGenClassName =>
checkArgument(originalKeyGenClassName.nonEmpty)

val convertedKeyGenClassName = HoodieSparkKeyGeneratorFactory.convertToSparkKeyGenerator(originalKeyGenClassName)

val keyGenProps = new TypedProperties()
keyGenProps.putAll(props)
keyGenProps.remove(SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME)
keyGenProps.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key, convertedKeyGenClassName)

KeyGenUtils.createKeyGeneratorByClassName(keyGenProps).asInstanceOf[SparkKeyGeneratorInterface]
}

override def getRecordKey(record: GenericRecord): String =
originalKeyGen.map {
_.getKey(record).getRecordKey
} getOrElse {
complexKeyGen.getRecordKey(record)
}

override def getRecordKey(row: Row): String =
originalKeyGen.map {
_.getRecordKey(row)
} getOrElse {
complexKeyGen.getRecordKey(row)
}


override def getRecordKey(internalRow: InternalRow, schema: StructType): UTF8String =
originalKeyGen.map {
_.getRecordKey(internalRow, schema)
} getOrElse {
complexKeyGen.getRecordKey(internalRow, schema)
}

override def getPartitionPath(record: GenericRecord): String = {
val partitionPath = originalKeyGen.map {
_.getKey(record).getPartitionPath
} getOrElse {
complexKeyGen.getPartitionPath(record)
}

convertPartitionPathToSqlType(partitionPath, rowType = false)
}

override def getRecordKey(record: GenericRecord): String = {
if (originKeyGen.isDefined) {
originKeyGen.get.getKey(record).getRecordKey
} else {
super.getRecordKey(record)
override def getPartitionPath(row: Row): String = {
val partitionPath = originalKeyGen.map {
_.getPartitionPath(row)
} getOrElse {
complexKeyGen.getPartitionPath(row)
}

convertPartitionPathToSqlType(partitionPath, rowType = true)
}

override def getRecordKey(row: Row): String = {
if (originKeyGen.isDefined) {
originKeyGen.get.asInstanceOf[SparkKeyGeneratorInterface].getRecordKey(row)
} else {
super.getRecordKey(row)
override def getPartitionPath(internalRow: InternalRow, schema: StructType): UTF8String = {
val partitionPath = originalKeyGen.map {
_.getPartitionPath(internalRow, schema)
} getOrElse {
complexKeyGen.getPartitionPath(internalRow, schema)
}

UTF8String.fromString(convertPartitionPathToSqlType(partitionPath.toString, rowType = true))
}

override def getRecordKeyFieldNames: util.List[String] = {
originalKeyGen.map(_.getRecordKeyFieldNames)
.getOrElse(complexKeyGen.getRecordKeyFieldNames)
}

override def getPartitionPathFields: util.List[String] = {
originalKeyGen.map {
case bkg: BaseKeyGenerator => bkg.getPartitionPathFields
case _ =>
Option(super.getPartitionPathFields).getOrElse(Collections.emptyList[String])
} getOrElse {
complexKeyGen.getPartitionPathFields
}
}

// TODO clean up
private def convertPartitionPathToSqlType(partitionPath: String, rowType: Boolean): String = {
if (partitionSchema.isDefined) {
// we can split the partitionPath here because we enable the URL_ENCODE_PARTITIONING_OPT
Expand Down Expand Up @@ -113,30 +171,11 @@ class SqlKeyGenerator(props: TypedProperties) extends ComplexKeyGenerator(props)
}
} else partitionPath
}

override def getPartitionPath(record: GenericRecord): String = {
val partitionPath = super.getPartitionPath(record)
convertPartitionPathToSqlType(partitionPath, rowType = false)
}

override def getPartitionPath(row: Row): String = {
val partitionPath = super.getPartitionPath(row)
convertPartitionPathToSqlType(partitionPath, rowType = true)
}
}

object SqlKeyGenerator {
val PARTITION_SCHEMA = "hoodie.sql.partition.schema"
val ORIGIN_KEYGEN_CLASS_NAME = "hoodie.sql.origin.keygen.class"
val ORIGINAL_KEYGEN_CLASS_NAME = "hoodie.sql.origin.keygen.class"
private val timestampTimeFormat = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")
private val sqlTimestampFormat = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.S")

def getRealKeyGenClassName(props: TypedProperties): String = {
val beforeKeyGenClassName = props.getString(SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME, null)
if (beforeKeyGenClassName != null && beforeKeyGenClassName.nonEmpty) {
HoodieSparkKeyGeneratorFactory.convertToSparkKeyGenerator(beforeKeyGenClassName)
} else {
DataSourceOptionsHelper.inferKeyGenClazz(props)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable,
URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning,
KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName,
SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName,
HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key),
HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key),
HiveSyncConfigHolder.HIVE_SYNC_MODE.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_MODE),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1064,7 +1064,7 @@ class TestHoodieSparkSqlWriter {
// for sql write
val m2 = Map(
HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getName,
SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME -> classOf[SimpleKeyGenerator].getName
SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME -> classOf[SimpleKeyGenerator].getName
)
val kg2 = HoodieWriterUtils.getOriginKeyGenerator(m2)
assertTrue(kg2 == classOf[SimpleKeyGenerator].getName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.hudi.common.table.view.HoodieTableFileSystemView
import org.apache.hudi.common.testutils.{HoodieTestDataGenerator, SchemaTestUtil}
import org.apache.hudi.testutils.HoodieSparkWriteableTestTable
import org.apache.spark.api.java.JavaSparkContext
import org.junit.jupiter.api.Assertions.assertEquals

import java.io.IOException
import java.net.URL
Expand Down Expand Up @@ -109,10 +110,36 @@ class TestRepairsProcedure extends HoodieSparkProcedureTestBase {
val newProps: URL = this.getClass.getClassLoader.getResource("table-config.properties")

// overwrite hoodie props
val Result = spark.sql(s"""call repair_overwrite_hoodie_props(table => '$tableName', new_props_file_path => '${newProps.getPath}')""").collect()
assertResult(15) {
Result.length
}
val expectedOutput ="""
|[hoodie.archivelog.folder,archived,archive]
|[hoodie.database.name,default,null]
|[hoodie.datasource.write.drop.partition.columns,false,false]
|[hoodie.datasource.write.hive_style_partitioning,true,null]
|[hoodie.datasource.write.partitionpath.urlencode,false,null]
|[hoodie.table.checksum,,]
|[hoodie.table.create.schema,,]
|[hoodie.table.keygenerator.class,org.apache.hudi.keygen.NonpartitionedKeyGenerator,null]
|[hoodie.table.name,,]
|[hoodie.table.precombine.field,ts,null]
|[hoodie.table.recordkey.fields,id,null]
|[hoodie.table.type,COPY_ON_WRITE,COPY_ON_WRITE]
|[hoodie.table.version,,]
|[hoodie.timeline.layout.version,,]""".stripMargin.trim

val actual = spark.sql(s"""call repair_overwrite_hoodie_props(table => '$tableName', new_props_file_path => '${newProps.getPath}')""")
.collect()
.map {
// omit these properties with variant values
case row if row.getString(0).equals("hoodie.table.checksum") => "[hoodie.table.checksum,,]"
case row if row.getString(0).equals("hoodie.table.create.schema") => "[hoodie.table.create.schema,,]"
case row if row.getString(0).equals("hoodie.table.name") => "[hoodie.table.name,,]"
case row if row.getString(0).equals("hoodie.table.version") => "[hoodie.table.version,,]"
case row if row.getString(0).equals("hoodie.timeline.layout.version") => "[hoodie.timeline.layout.version,,]"
case o => o.toString()
}
.mkString("\n")

assertEquals(expectedOutput, actual)
}
}

Expand Down