Skip to content

Commit 8c5b113

Browse files
prashantwasonclaude
andcommitted
fix(spark): catch HoodieSchemaNotFoundException in 3-arg DefaultSource.createRelation
The 2-arg `createRelation(sqlContext, parameters)` overload wraps its body in a try/catch that converts `HoodieSchemaNotFoundException` to `EmptyRelation` (added in HUDI-7147 / #10689). The 3-arg `createRelation(sqlContext, optParams, schema)` overload — which Spark's `DataSource.resolveRelation()` invokes directly via the `SchemaRelationProvider` path whenever a user-supplied schema is present (e.g. `spark.read.schema(s).format("hudi").load(path)`, or HMS-catalog resolution that already knows the schema) — has no such catch, so the exception propagates and breaks query analysis. Mirror the 2-arg catch on the 3-arg overload so behavior is symmetric: schema-less Hudi tables resolve to an empty relation regardless of which overload Spark invokes. Also adds `TestCOWDataSource.testReadOfAnEmptyTableWithUserSuppliedSchema`, a sibling of the existing `testReadOfAnEmptyTable` that exercises the 3-arg path. Closes #18668 Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1 parent 38db5ed commit 8c5b113

2 files changed

Lines changed: 41 additions & 1 deletion

File tree

hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,16 @@ class DefaultSource extends RelationProvider
134134
parameters
135135
}
136136

137-
val relation = DefaultSource.createRelation(sqlContext, metaClient, schema, options.toMap)
137+
// Spark's DataSource.resolveRelation() invokes this 3-arg overload directly via the
138+
// SchemaRelationProvider path when a user-supplied schema is present (e.g.
139+
// spark.read.schema(...).load(path)). The 2-arg overload catches
140+
// HoodieSchemaNotFoundException and returns an EmptyRelation, but that catch is bypassed
141+
// on this path, so we mirror the same handling here.
142+
val relation = try {
143+
DefaultSource.createRelation(sqlContext, metaClient, schema, options.toMap)
144+
} catch {
145+
case _: HoodieSchemaNotFoundException => new EmptyRelation(sqlContext, new StructType())
146+
}
138147
log.info(s"Created relation ${relation.getClass.getSimpleName} with ${options.size} resolved options")
139148
relation
140149
}

hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2214,6 +2214,37 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup
22142214
assertEquals(count, 0)
22152215
}
22162216

2217+
@Test
2218+
def testReadOfAnEmptyTableWithUserSuppliedSchema(): Unit = {
2219+
val (writeOpts, _) = getWriterReaderOpts(HoodieRecordType.AVRO)
2220+
2221+
// Insert + then delete the only completed commit so the table has no resolvable schema.
2222+
val records = recordsToStrings(dataGen.generateInserts("000", 100)).asScala.toList
2223+
val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
2224+
inputDF.write.format("hudi")
2225+
.options(writeOpts)
2226+
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
2227+
.mode(SaveMode.Overwrite)
2228+
.save(basePath)
2229+
2230+
val fileStatuses = storage.listDirectEntries(
2231+
new StoragePath(basePath + StoragePath.SEPARATOR + HoodieTableMetaClient.METAFOLDER_NAME
2232+
+ StoragePath.SEPARATOR + HoodieTableMetaClient.TIMELINEFOLDER_NAME),
2233+
new StoragePathFilter {
2234+
override def accept(path: StoragePath): Boolean = {
2235+
path.getName.endsWith(HoodieTimeline.COMMIT_ACTION)
2236+
}
2237+
})
2238+
storage.deleteFile(fileStatuses.get(0).getPath)
2239+
2240+
// spark.read.schema(...) triggers Spark's SchemaRelationProvider path which calls the
2241+
// 3-arg DefaultSource.createRelation overload directly. Without the catch on that
2242+
// overload, this would fail with HoodieSchemaNotFoundException.
2243+
val userSchema = inputDF.schema
2244+
val count = spark.read.schema(userSchema).format("hudi").load(basePath).count()
2245+
assertEquals(count, 0)
2246+
}
2247+
22172248
/**
22182249
* Test incremental queries and time travel queries with event time ordering.
22192250
*

0 commit comments

Comments
 (0)