@@ -26,19 +26,20 @@ import org.apache.hadoop.mapred.JobConf
2626import org .apache .hudi .HoodieBaseRelation .{convertToAvroSchema , createHFileReader , generateUnsafeProjection , getPartitionPath , projectSchema }
2727import org .apache .hudi .HoodieConversionUtils .toScalaOption
2828import org .apache .hudi .avro .HoodieAvroUtils
29+ import org .apache .hudi .client .utils .SparkInternalSchemaConverter
2930import org .apache .hudi .common .config .{HoodieMetadataConfig , SerializableConfiguration }
3031import org .apache .hudi .common .fs .FSUtils
3132import org .apache .hudi .common .model .{HoodieFileFormat , HoodieRecord }
3233import org .apache .hudi .common .table .timeline .{HoodieInstant , HoodieTimeline }
3334import org .apache .hudi .common .table .view .HoodieTableFileSystemView
3435import org .apache .hudi .common .table .{HoodieTableConfig , HoodieTableMetaClient , TableSchemaResolver }
3536import org .apache .hudi .common .util .StringUtils
37+ import org .apache .hudi .common .util .StringUtils .isNullOrEmpty
3638import org .apache .hudi .common .util .ValidationUtils .checkState
37- import org .apache .hudi .internal .schema .convert .AvroInternalSchemaConverter
38- import org .apache .hudi .internal .schema .utils .InternalSchemaUtils
3939import org .apache .hudi .internal .schema .{HoodieSchemaException , InternalSchema }
40+ import org .apache .hudi .internal .schema .convert .AvroInternalSchemaConverter
41+ import org .apache .hudi .internal .schema .utils .{InternalSchemaUtils , SerDeHelper }
4042import org .apache .hudi .io .storage .HoodieHFileReader
41- import org .apache .spark .SerializableWritable
4243import org .apache .spark .execution .datasources .HoodieInMemoryFileIndex
4344import org .apache .spark .internal .Logging
4445import org .apache .spark .rdd .RDD
@@ -62,7 +63,7 @@ import scala.util.{Failure, Success, Try}
6263
6364trait HoodieFileSplit {}
6465
65- case class HoodieTableSchema (structTypeSchema : StructType , avroSchemaStr : String , internalSchema : InternalSchema = InternalSchema .getEmptyInternalSchema )
66+ case class HoodieTableSchema (structTypeSchema : StructType , avroSchemaStr : String , internalSchema : Option [ InternalSchema ] = None )
6667
6768case class HoodieTableState (tablePath : String ,
6869 latestCommitTimestamp : String ,
@@ -132,33 +133,36 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
132133 * NOTE: Initialization of teh following members is coupled on purpose to minimize amount of I/O
133134 * required to fetch table's Avro and Internal schemas
134135 */
135- protected lazy val (tableAvroSchema : Schema , internalSchema : InternalSchema ) = {
136+ protected lazy val (tableAvroSchema : Schema , internalSchemaOpt : Option [ InternalSchema ] ) = {
136137 val schemaResolver = new TableSchemaResolver (metaClient)
137- val avroSchema : Schema = schemaSpec.map(convertToAvroSchema).getOrElse {
138- Try (schemaResolver.getTableAvroSchema) match {
139- case Success (schema) => schema
138+ val internalSchemaOpt = if (! isSchemaEvolutionEnabled) {
139+ None
140+ } else {
141+ Try (schemaResolver.getTableInternalSchemaFromCommitMetadata) match {
142+ case Success (internalSchemaOpt) => toScalaOption(internalSchemaOpt)
140143 case Failure (e) =>
141- logError (" Failed to fetch schema from the table" , e)
142- throw new HoodieSchemaException ( " Failed to fetch schema from the table " )
144+ logWarning (" Failed to fetch internal- schema from the table" , e)
145+ None
143146 }
144147 }
145148
146- val internalSchema : InternalSchema = if (! isSchemaEvolutionEnabled) {
147- InternalSchema .getEmptyInternalSchema
148- } else {
149- Try (schemaResolver.getTableInternalSchemaFromCommitMetadata) match {
150- case Success (internalSchemaOpt) =>
151- toScalaOption(internalSchemaOpt).getOrElse(InternalSchema .getEmptyInternalSchema)
149+ val avroSchema = internalSchemaOpt.map { is =>
150+ AvroInternalSchemaConverter .convert(is, " schema" )
151+ } orElse {
152+ schemaSpec.map(convertToAvroSchema)
153+ } getOrElse {
154+ Try (schemaResolver.getTableAvroSchema) match {
155+ case Success (schema) => schema
152156 case Failure (e) =>
153- logWarning (" Failed to fetch internal- schema from the table" , e)
154- InternalSchema .getEmptyInternalSchema
157+ logError (" Failed to fetch schema from the table" , e)
158+ throw new HoodieSchemaException ( " Failed to fetch schema from the table " )
155159 }
156160 }
157161
158- (avroSchema, internalSchema )
162+ (avroSchema, internalSchemaOpt )
159163 }
160164
161- protected lazy val tableStructSchema : StructType = AvroConversionUtils .convertAvroSchemaToStructType(tableAvroSchema)
165+ protected val tableStructSchema : StructType = AvroConversionUtils .convertAvroSchemaToStructType(tableAvroSchema)
162166
163167 protected val partitionColumns : Array [String ] = tableConfig.getPartitionFields.orElse(Array .empty)
164168
@@ -230,7 +234,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
230234 * meaning that regardless of whether this columns are being requested by the query they will be fetched
231235 * regardless so that relation is able to combine records properly (if necessary)
232236 *
233- * @VisibleForTesting
237+ * @VisibleInTests
234238 */
235239 val mandatoryFields : Seq [String ]
236240
@@ -247,7 +251,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
247251 /**
248252 * Returns true in case table supports Schema on Read (Schema Evolution)
249253 */
250- def hasSchemaOnRead : Boolean = ! internalSchema.isEmptySchema
254+ def hasSchemaOnRead : Boolean = internalSchemaOpt.isDefined
251255
252256 /**
253257 * Data schema is determined as the actual schema of the Table's Data Files (for ex, parquet/orc/etc);
@@ -270,9 +274,8 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
270274 def canPruneRelationSchema : Boolean =
271275 (fileFormat.isInstanceOf [ParquetFileFormat ] || fileFormat.isInstanceOf [OrcFileFormat ]) &&
272276 // NOTE: Some relations might be disabling sophisticated schema pruning techniques (for ex, nested schema pruning)
273- canPruneSchema &&
274277 // TODO(HUDI-XXX) internal schema doesn't supported nested schema pruning currently
275- internalSchema.isEmptySchema
278+ canPruneSchema && ! hasSchemaOnRead
276279
277280 override def schema : StructType = {
278281 // NOTE: Optimizer could prune the schema (applying for ex, [[NestedSchemaPruning]] rule) setting new updated
@@ -319,19 +322,17 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
319322 // w/ more than 2 types are involved)
320323 val sourceSchema = optimizerPrunedDataSchema.map(convertToAvroSchema).getOrElse(tableAvroSchema)
321324 val (requiredAvroSchema, requiredStructSchema, requiredInternalSchema) =
322- projectSchema(sourceSchema, targetColumns, Some (internalSchema) )
325+ projectSchema(Either .cond(internalSchemaOpt.isDefined, internalSchemaOpt.get, sourceSchema), targetColumns )
323326
324327 val filterExpressions = convertToExpressions(filters)
325328 val (partitionFilters, dataFilters) = filterExpressions.partition(isPartitionPredicate)
326329
327330 val fileSplits = collectFileSplits(partitionFilters, dataFilters)
328331
329- val tableAvroSchemaStr =
330- if (internalSchema.isEmptySchema) tableAvroSchema.toString
331- else AvroInternalSchemaConverter .convert(internalSchema, tableAvroSchema.getName).toString
332+ val tableAvroSchemaStr = tableAvroSchema.toString
332333
333- val tableSchema = HoodieTableSchema (tableStructSchema, tableAvroSchemaStr, internalSchema )
334- val requiredSchema = HoodieTableSchema (requiredStructSchema, requiredAvroSchema.toString, requiredInternalSchema)
334+ val tableSchema = HoodieTableSchema (tableStructSchema, tableAvroSchemaStr, internalSchemaOpt )
335+ val requiredSchema = HoodieTableSchema (requiredStructSchema, requiredAvroSchema.toString, Some ( requiredInternalSchema) )
335336
336337 // Since schema requested by the caller might contain partition columns, we might need to
337338 // prune it, removing all partition columns from it in case these columns are not persisted
@@ -559,6 +560,17 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
559560 }
560561 }
561562
563+ protected def embedInternalSchema (conf : Configuration , internalSchemaOpt : Option [InternalSchema ]): Configuration = {
564+ val internalSchema = internalSchemaOpt.getOrElse(InternalSchema .getEmptyInternalSchema)
565+ val querySchemaString = SerDeHelper .toJson(internalSchema)
566+ if (! isNullOrEmpty(querySchemaString)) {
567+ conf.set(SparkInternalSchemaConverter .HOODIE_QUERY_SCHEMA , SerDeHelper .toJson(internalSchema))
568+ conf.set(SparkInternalSchemaConverter .HOODIE_TABLE_PATH , metaClient.getBasePath)
569+ conf.set(SparkInternalSchemaConverter .HOODIE_VALID_COMMITS_LIST , validCommits)
570+ }
571+ conf
572+ }
573+
562574 private def tryPrunePartitionColumns (tableSchema : HoodieTableSchema ,
563575 requiredSchema : HoodieTableSchema ): (StructType , HoodieTableSchema , HoodieTableSchema ) = {
564576 if (shouldExtractPartitionValuesFromPartitionPath) {
@@ -601,33 +613,30 @@ object HoodieBaseRelation extends SparkAdapterSupport {
601613 /**
602614 * Projects provided schema by picking only required (projected) top-level columns from it
603615 *
604- * @param tableAvroSchema schema to project
616+ * @param tableSchema schema to project (either of [[ InternalSchema ]] or Avro's [[ Schema ]])
605617 * @param requiredColumns required top-level columns to be projected
606- * @param internalSchemaOpt optional internal schema, providing for appropriate handling of schema evolution
607618 */
608- // TODO revisit internal schema handling
609- def projectSchema (tableAvroSchema : Schema ,
610- requiredColumns : Array [String ],
611- internalSchemaOpt : Option [InternalSchema ] = None ): (Schema , StructType , InternalSchema ) = {
612- internalSchemaOpt match {
613- case Some (internalSchema) if ! internalSchema.isEmptySchema =>
619+ def projectSchema (tableSchema : Either [Schema , InternalSchema ], requiredColumns : Array [String ]): (Schema , StructType , InternalSchema ) = {
620+ tableSchema match {
621+ case Right (internalSchema) =>
622+ checkState(! internalSchema.isEmptySchema)
614623 // TODO extend pruning to leverage optimizer pruned schema
615624 val prunedInternalSchema = InternalSchemaUtils .pruneInternalSchema(internalSchema, requiredColumns.toList.asJava)
616- val requiredAvroSchema = AvroInternalSchemaConverter .convert(prunedInternalSchema, tableAvroSchema.getName )
625+ val requiredAvroSchema = AvroInternalSchemaConverter .convert(prunedInternalSchema, " schema " )
617626 val requiredStructSchema = AvroConversionUtils .convertAvroSchemaToStructType(requiredAvroSchema)
618627
619628 (requiredAvroSchema, requiredStructSchema, prunedInternalSchema)
620629
621- case _ =>
622- val fieldMap = tableAvroSchema .getFields.asScala.map(f => f.name() -> f).toMap
630+ case Left (avroSchema) =>
631+ val fieldMap = avroSchema .getFields.asScala.map(f => f.name() -> f).toMap
623632 val requiredFields = requiredColumns.map { col =>
624633 val f = fieldMap(col)
625634 // We have to create a new [[Schema.Field]] since Avro schemas can't share field
626635 // instances (and will throw "org.apache.avro.AvroRuntimeException: Field already used")
627636 new Schema .Field (f.name(), f.schema(), f.doc(), f.defaultVal(), f.order())
628637 }.toList
629- val requiredAvroSchema = Schema .createRecord(tableAvroSchema .getName, tableAvroSchema .getDoc,
630- tableAvroSchema .getNamespace, tableAvroSchema .isError, requiredFields.asJava)
638+ val requiredAvroSchema = Schema .createRecord(avroSchema .getName, avroSchema .getDoc,
639+ avroSchema .getNamespace, avroSchema .isError, requiredFields.asJava)
631640 val requiredStructSchema = AvroConversionUtils .convertAvroSchemaToStructType(requiredAvroSchema)
632641
633642 (requiredAvroSchema, requiredStructSchema, InternalSchema .getEmptyInternalSchema)
@@ -640,10 +649,11 @@ object HoodieBaseRelation extends SparkAdapterSupport {
640649 filters : Seq [Filter ],
641650 options : Map [String , String ],
642651 hadoopConf : Configuration ): PartitionedFile => Iterator [InternalRow ] = {
643- val hadoopConfBroadcast = spark.sparkContext.broadcast(new SerializableWritable (hadoopConf))
652+ val hadoopConfBroadcast =
653+ spark.sparkContext.broadcast(new SerializableConfiguration (hadoopConf))
644654
645655 partitionedFile => {
646- val hadoopConf = hadoopConfBroadcast.value.value
656+ val hadoopConf = hadoopConfBroadcast.value.get()
647657 val reader = new HoodieHFileReader [GenericRecord ](hadoopConf, new Path (partitionedFile.filePath),
648658 new CacheConfig (hadoopConf))
649659
0 commit comments