Skip to content

Commit 5e773a6

Browse files
author
Alexey Kudinkin
committed
Restore fallback to HadoopFsRelation
1 parent d97cbe9 commit 5e773a6

2 files changed

Lines changed: 90 additions & 5 deletions

File tree

hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala

Lines changed: 68 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,10 +59,8 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
5959
// For more details please check HUDI-4161
6060
// NOTE: This override has to mirror semantic of whenever this Relation is converted into [[HadoopFsRelation]],
6161
// which is currently done for all cases, except when Schema Evolution is enabled
62-
override protected val shouldExtractPartitionValuesFromPartitionPath: Boolean = {
63-
val enableSchemaOnRead = !internalSchema.isEmptySchema
64-
!enableSchemaOnRead
65-
}
62+
override protected val shouldExtractPartitionValuesFromPartitionPath: Boolean =
63+
internalSchemaOpt.isEmpty
6664

6765
override lazy val mandatoryFields: Seq[String] =
6866
// TODO reconcile, record's key shouldn't be mandatory for base-file only relation
@@ -116,4 +114,70 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
116114
sparkAdapter.getFilePartitions(sparkSession, fileSplits, maxSplitBytes)
117115
.map(HoodieBaseFileSplit.apply)
118116
}
117+
118+
/**
119+
* NOTE: We have to fallback to [[HadoopFsRelation]] to make sure that all of the Spark optimizations could be
120+
* equally applied to Hudi tables, since some of those are predicated on the usage of [[HadoopFsRelation]],
121+
* and won't be applicable in case of us using our own custom relations (one of such optimizations is [[SchemaPruning]]
122+
* rule; you can find more details in HUDI-3896)
123+
*/
124+
def toHadoopFsRelation: HadoopFsRelation = {
125+
val (tableFileFormat, formatClassName) =
126+
metaClient.getTableConfig.getBaseFileFormat match {
127+
case HoodieFileFormat.ORC => (new OrcFileFormat, "orc")
128+
case HoodieFileFormat.PARQUET =>
129+
// We're delegating to Spark to append partition values to every row only in cases
130+
// when these corresponding partition-values are not persisted w/in the data file itself
131+
val parquetFileFormat = sparkAdapter.createHoodieParquetFileFormat(shouldExtractPartitionValuesFromPartitionPath).get
132+
(parquetFileFormat, HoodieParquetFileFormat.FILE_FORMAT_ID)
133+
}
134+
135+
if (globPaths.isEmpty) {
136+
// NOTE: There are currently 2 ways partition values could be fetched:
137+
// - Source columns (producing the values used for physical partitioning) will be read
138+
// from the data file
139+
// - Values parsed from the actual partition path would be appended to the final dataset
140+
//
141+
// In the former case, we don't need to provide the partition-schema to the relation,
142+
// therefore we simply stub it w/ empty schema and use full table-schema as the one being
143+
// read from the data file.
144+
//
145+
// In the latter, we have to specify proper partition schema as well as "data"-schema, essentially
146+
// being a table-schema with all partition columns stripped out
147+
val (partitionSchema, dataSchema) = if (shouldExtractPartitionValuesFromPartitionPath) {
148+
(fileIndex.partitionSchema, fileIndex.dataSchema)
149+
} else {
150+
(StructType(Nil), tableStructSchema)
151+
}
152+
153+
HadoopFsRelation(
154+
location = fileIndex,
155+
partitionSchema = partitionSchema,
156+
dataSchema = dataSchema,
157+
bucketSpec = None,
158+
fileFormat = tableFileFormat,
159+
optParams)(sparkSession)
160+
} else {
161+
val readPathsStr = optParams.get(DataSourceReadOptions.READ_PATHS.key)
162+
val extraReadPaths = readPathsStr.map(p => p.split(",").toSeq).getOrElse(Seq())
163+
164+
DataSource.apply(
165+
sparkSession = sparkSession,
166+
paths = extraReadPaths,
167+
userSpecifiedSchema = userSchema,
168+
className = formatClassName,
169+
// Since we're reading the table as just collection of files we have to make sure
170+
// we only read the latest version of every Hudi's file-group, which might be compacted, clustered, etc.
171+
// while keeping previous versions of the files around as well.
172+
//
173+
// We rely on [[HoodieROTablePathFilter]], to do proper filtering to assure that
174+
options = optParams ++ Map(
175+
"mapreduce.input.pathFilter.class" -> classOf[HoodieROTablePathFilter].getName
176+
),
177+
partitionColumns = partitionColumns
178+
)
179+
.resolveRelation()
180+
.asInstanceOf[HadoopFsRelation]
181+
}
182+
}
119183
}

hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,8 @@ class DefaultSource extends RelationProvider
126126
case (COPY_ON_WRITE, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) |
127127
(COPY_ON_WRITE, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) |
128128
(MERGE_ON_READ, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) =>
129-
new BaseFileOnlyRelation(sqlContext, metaClient, parameters, userSchema, globPaths)
129+
resolveBaseFileOnlyRelation(sqlContext, globPaths, userSchema, metaClient, parameters)
130+
130131
case (COPY_ON_WRITE, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
131132
new IncrementalRelation(sqlContext, parameters, userSchema, metaClient)
132133

@@ -225,6 +226,26 @@ class DefaultSource extends RelationProvider
225226
new HoodieStreamSource(sqlContext, metadataPath, schema, parameters)
226227
}
227228

229+
private def resolveBaseFileOnlyRelation(sqlContext: SQLContext,
230+
globPaths: Seq[Path],
231+
userSchema: Option[StructType],
232+
metaClient: HoodieTableMetaClient,
233+
optParams: Map[String, String]): BaseRelation = {
234+
val baseRelation = new BaseFileOnlyRelation(sqlContext, metaClient, optParams, userSchema, globPaths)
235+
val enableSchemaOnRead: Boolean = !tryFetchInternalSchema(metaClient).isEmptySchema
236+
237+
// NOTE: We fallback to [[HadoopFsRelation]] in all of the cases except ones requiring usage of
238+
// [[BaseFileOnlyRelation]] to function correctly. This is necessary to maintain performance parity w/
239+
// vanilla Spark, since some of the Spark optimizations are predicated on the using of [[HadoopFsRelation]].
240+
//
241+
// You can check out HUDI-3896 for more details
242+
if (enableSchemaOnRead) {
243+
baseRelation
244+
} else {
245+
baseRelation.toHadoopFsRelation
246+
}
247+
}
248+
228249
private def resolveBaseFileOnlyRelation(sqlContext: SQLContext,
229250
globPaths: Seq[Path],
230251
userSchema: Option[StructType],

0 commit comments

Comments
 (0)