Skip to content

Commit b6b5159

Browse files
authored
[HUDI-4703] use the historical schema to response time travel query (apache#6499)
* [HUDI-4703] use the historical schema to response time travel query
1 parent b228a27 commit b6b5159

File tree

4 files changed

+184
-11
lines changed

4 files changed

+184
-11
lines changed

hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,12 @@
2323
import org.apache.avro.Schema.Field;
2424
import org.apache.avro.SchemaCompatibility;
2525
import org.apache.avro.generic.IndexedRecord;
26+
2627
import org.apache.hadoop.conf.Configuration;
2728
import org.apache.hadoop.fs.FileSystem;
2829
import org.apache.hadoop.fs.Path;
2930
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
31+
3032
import org.apache.hudi.avro.HoodieAvroUtils;
3133
import org.apache.hudi.common.model.HoodieCommitMetadata;
3234
import org.apache.hudi.common.model.HoodieFileFormat;
@@ -53,8 +55,10 @@
5355
import org.apache.hudi.io.storage.HoodieHFileReader;
5456
import org.apache.hudi.io.storage.HoodieOrcReader;
5557
import org.apache.hudi.util.Lazy;
58+
5659
import org.apache.log4j.LogManager;
5760
import org.apache.log4j.Logger;
61+
5862
import org.apache.parquet.avro.AvroSchemaConverter;
5963
import org.apache.parquet.format.converter.ParquetMetadataConverter;
6064
import org.apache.parquet.hadoop.ParquetFileReader;
@@ -138,6 +142,19 @@ public Schema getTableAvroSchema(boolean includeMetadataFields) throws Exception
138142
return getTableAvroSchemaInternal(includeMetadataFields, Option.empty());
139143
}
140144

145+
/**
146+
* Fetches tables schema in Avro format as of the given instant
147+
*
148+
* @param timestamp as of which table's schema will be fetched
149+
*/
150+
public Schema getTableAvroSchema(String timestamp) throws Exception {
151+
Option<HoodieInstant> instant = metaClient.getActiveTimeline().getCommitsTimeline()
152+
.filterCompletedInstants()
153+
.findInstantsBeforeOrEquals(timestamp)
154+
.lastInstant();
155+
return getTableAvroSchemaInternal(metaClient.getTableConfig().populateMetaFields(), instant);
156+
}
157+
141158
/**
142159
* Fetches tables schema in Avro format as of the given instant
143160
*
@@ -496,6 +513,18 @@ public Option<InternalSchema> getTableInternalSchemaFromCommitMetadata() {
496513
return timeline.lastInstant().flatMap(this::getTableInternalSchemaFromCommitMetadata);
497514
}
498515

516+
/**
517+
* Gets the InternalSchema for a hoodie table from the HoodieCommitMetadata of the instant.
518+
*
519+
* @return InternalSchema for this table
520+
*/
521+
public Option<InternalSchema> getTableInternalSchemaFromCommitMetadata(String timestamp) {
522+
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline()
523+
.filterCompletedInstants()
524+
.findInstantsBeforeOrEquals(timestamp);
525+
return timeline.lastInstant().flatMap(this::getTableInternalSchemaFromCommitMetadata);
526+
}
527+
499528
/**
500529
* Gets the InternalSchema for a hoodie table from the HoodieCommitMetadata of the instant.
501530
*

hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,12 @@ package org.apache.hudi
1919

2020
import org.apache.avro.Schema
2121
import org.apache.avro.generic.GenericRecord
22+
2223
import org.apache.hadoop.conf.Configuration
2324
import org.apache.hadoop.fs.{FileStatus, Path}
2425
import org.apache.hadoop.hbase.io.hfile.CacheConfig
2526
import org.apache.hadoop.mapred.JobConf
27+
2628
import org.apache.hudi.HoodieBaseRelation._
2729
import org.apache.hudi.HoodieConversionUtils.toScalaOption
2830
import org.apache.hudi.avro.HoodieAvroUtils
@@ -41,6 +43,7 @@ import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
4143
import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
4244
import org.apache.hudi.internal.schema.{HoodieSchemaException, InternalSchema}
4345
import org.apache.hudi.io.storage.HoodieHFileReader
46+
4447
import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex
4548
import org.apache.spark.internal.Logging
4649
import org.apache.spark.rdd.RDD
@@ -59,6 +62,7 @@ import org.apache.spark.unsafe.types.UTF8String
5962

6063
import java.net.URI
6164
import java.util.Locale
65+
6266
import scala.collection.JavaConverters._
6367
import scala.util.control.NonFatal
6468
import scala.util.{Failure, Success, Try}
@@ -139,7 +143,10 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
139143
val internalSchemaOpt = if (!isSchemaEvolutionEnabled) {
140144
None
141145
} else {
142-
Try(schemaResolver.getTableInternalSchemaFromCommitMetadata) match {
146+
Try {
147+
specifiedQueryTimestamp.map(schemaResolver.getTableInternalSchemaFromCommitMetadata)
148+
.getOrElse(schemaResolver.getTableInternalSchemaFromCommitMetadata)
149+
} match {
143150
case Success(internalSchemaOpt) => toScalaOption(internalSchemaOpt)
144151
case Failure(e) =>
145152
logWarning("Failed to fetch internal-schema from the table", e)
@@ -149,6 +156,8 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
149156

150157
val avroSchema = internalSchemaOpt.map { is =>
151158
AvroInternalSchemaConverter.convert(is, "schema")
159+
} orElse {
160+
specifiedQueryTimestamp.map(schemaResolver.getTableAvroSchema)
152161
} orElse {
153162
schemaSpec.map(convertToAvroSchema)
154163
} getOrElse {

hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala

Lines changed: 86 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,16 @@ package org.apache.hudi.functional
1919

2020
import org.apache.hudi.DataSourceWriteOptions._
2121
import org.apache.hudi.common.model.HoodieTableType
22+
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
2223
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
2324
import org.apache.hudi.config.HoodieWriteConfig
2425
import org.apache.hudi.keygen.{ComplexKeyGenerator, NonpartitionedKeyGenerator}
2526
import org.apache.hudi.testutils.HoodieClientTestBase
2627
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions}
28+
2729
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
28-
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
30+
31+
import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull, assertNull, assertTrue}
2932
import org.junit.jupiter.api.{AfterEach, BeforeEach}
3033
import org.junit.jupiter.params.ParameterizedTest
3134
import org.junit.jupiter.params.provider.EnumSource
@@ -39,9 +42,8 @@ class TestTimeTravelQuery extends HoodieClientTestBase {
3942
"hoodie.upsert.shuffle.parallelism" -> "4",
4043
"hoodie.bulkinsert.shuffle.parallelism" -> "2",
4144
"hoodie.delete.shuffle.parallelism" -> "1",
42-
DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
43-
DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
44-
DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp",
45+
DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id",
46+
DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "version",
4547
HoodieWriteConfig.TBL_NAME.key -> "hoodie_test"
4648
)
4749

@@ -72,8 +74,6 @@ class TestTimeTravelQuery extends HoodieClientTestBase {
7274
df1.write.format("hudi")
7375
.options(commonOpts)
7476
.option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name())
75-
.option(RECORDKEY_FIELD.key, "id")
76-
.option(PRECOMBINE_FIELD.key, "version")
7777
.option(PARTITIONPATH_FIELD.key, "")
7878
.option(KEYGENERATOR_CLASS_NAME.key, classOf[NonpartitionedKeyGenerator].getName)
7979
.mode(SaveMode.Overwrite)
@@ -86,8 +86,6 @@ class TestTimeTravelQuery extends HoodieClientTestBase {
8686
df2.write.format("hudi")
8787
.options(commonOpts)
8888
.option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name())
89-
.option(RECORDKEY_FIELD.key, "id")
90-
.option(PRECOMBINE_FIELD.key, "version")
9189
.option(PARTITIONPATH_FIELD.key, "")
9290
.option(KEYGENERATOR_CLASS_NAME.key, classOf[NonpartitionedKeyGenerator].getName)
9391
.mode(SaveMode.Append)
@@ -100,8 +98,6 @@ class TestTimeTravelQuery extends HoodieClientTestBase {
10098
df3.write.format("hudi")
10199
.options(commonOpts)
102100
.option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name())
103-
.option(RECORDKEY_FIELD.key, "id")
104-
.option(PRECOMBINE_FIELD.key, "version")
105101
.option(PARTITIONPATH_FIELD.key, "")
106102
.option(KEYGENERATOR_CLASS_NAME.key, classOf[NonpartitionedKeyGenerator].getName)
107103
.mode(SaveMode.Append)
@@ -228,4 +224,84 @@ class TestTimeTravelQuery extends HoodieClientTestBase {
228224
val format = new SimpleDateFormat("yyyy-MM-dd")
229225
format.format(date)
230226
}
227+
228+
@ParameterizedTest
229+
@EnumSource(value = classOf[HoodieTableType])
230+
def testTimeTravelQueryWithSchemaEvolution(tableType: HoodieTableType): Unit = {
231+
initMetaClient(tableType)
232+
val _spark = spark
233+
import _spark.implicits._
234+
235+
// First write
236+
val df1 = Seq((1, "a1", 10, 1000)).toDF("id", "name", "value", "version")
237+
df1.write.format("hudi")
238+
.options(commonOpts)
239+
.option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name())
240+
.option(PARTITIONPATH_FIELD.key, "")
241+
.mode(SaveMode.Overwrite)
242+
.save(basePath)
243+
244+
metaClient = HoodieTableMetaClient.builder()
245+
.setBasePath(basePath)
246+
.setConf(spark.sessionState.newHadoopConf)
247+
.build()
248+
val firstCommit = metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp
249+
250+
// Second write
251+
val df2 = Seq((1, "a1", 12, 1001, "2022")).toDF("id", "name", "value", "version", "year")
252+
df2.write.format("hudi")
253+
.options(commonOpts)
254+
.option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name())
255+
.option(PARTITIONPATH_FIELD.key, "")
256+
.mode(SaveMode.Append)
257+
.save(basePath)
258+
metaClient.reloadActiveTimeline()
259+
val secondCommit = metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp
260+
261+
// Third write
262+
val df3 = Seq((1, "a1", 13, 1002, "2022", "08")).toDF("id", "name", "value", "version", "year", "month")
263+
df3.write.format("hudi")
264+
.options(commonOpts)
265+
.option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name())
266+
.option(PARTITIONPATH_FIELD.key, "")
267+
.mode(SaveMode.Append)
268+
.save(basePath)
269+
metaClient.reloadActiveTimeline()
270+
val thirdCommit = metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp
271+
272+
val tableSchemaResolver = new TableSchemaResolver(metaClient)
273+
274+
// Query as of firstCommitTime
275+
val result1 = spark.read.format("hudi")
276+
.option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key, firstCommit)
277+
.load(basePath)
278+
.select("id", "name", "value", "version")
279+
.take(1)(0)
280+
assertEquals(Row(1, "a1", 10, 1000), result1)
281+
val schema1 = tableSchemaResolver.getTableAvroSchema(firstCommit)
282+
assertNull(schema1.getField("year"))
283+
assertNull(schema1.getField("month"))
284+
285+
// Query as of secondCommitTime
286+
val result2 = spark.read.format("hudi")
287+
.option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key, secondCommit)
288+
.load(basePath)
289+
.select("id", "name", "value", "version", "year")
290+
.take(1)(0)
291+
assertEquals(Row(1, "a1", 12, 1001, "2022"), result2)
292+
val schema2 = tableSchemaResolver.getTableAvroSchema(secondCommit)
293+
assertNotNull(schema2.getField("year"))
294+
assertNull(schema2.getField("month"))
295+
296+
// Query as of thirdCommitTime
297+
val result3 = spark.read.format("hudi")
298+
.option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key, thirdCommit)
299+
.load(basePath)
300+
.select("id", "name", "value", "version", "year", "month")
301+
.take(1)(0)
302+
assertEquals(Row(1, "a1", 13, 1002, "2022", "08"), result3)
303+
val schema3 = tableSchemaResolver.getTableAvroSchema(thirdCommit)
304+
assertNotNull(schema3.getField("year"))
305+
assertNotNull(schema3.getField("month"))
306+
}
231307
}

hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestTimeTravelTable.scala

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,4 +294,63 @@ class TestTimeTravelTable extends HoodieSparkSqlTestBase {
294294
}
295295
}
296296
}
297+
298+
test("Test Time Travel With Schema Evolution") {
299+
if (HoodieSparkUtils.gteqSpark3_2) {
300+
withTempDir { tmp =>
301+
spark.sql("set hoodie.schema.on.read.enable=true")
302+
val tableName = generateTableName
303+
spark.sql(
304+
s"""
305+
|create table $tableName (
306+
| id int,
307+
| name string,
308+
| price double,
309+
| ts long
310+
|) using hudi
311+
| tblproperties (
312+
| primaryKey = 'id',
313+
| preCombineField = 'ts'
314+
| )
315+
| location '${tmp.getCanonicalPath}/$tableName'
316+
""".stripMargin)
317+
318+
spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
319+
320+
val metaClient = HoodieTableMetaClient.builder()
321+
.setBasePath(s"${tmp.getCanonicalPath}/$tableName")
322+
.setConf(spark.sessionState.newHadoopConf())
323+
.build()
324+
val instant1 = metaClient.reloadActiveTimeline().getAllCommitsTimeline
325+
.lastInstant().get().getTimestamp
326+
327+
// add column
328+
spark.sql(s"alter table $tableName add columns (company string)")
329+
spark.sql(s"insert into $tableName values(2, 'a2', 11, 1100, 'hudi')")
330+
val instant2 = metaClient.reloadActiveTimeline().getAllCommitsTimeline
331+
.lastInstant().get().getTimestamp
332+
333+
// drop column
334+
spark.sql(s"alter table $tableName drop column price")
335+
336+
val result1 = spark.sql(s"select * from ${tableName} timestamp as of $instant1 order by id")
337+
.drop("_hoodie_commit_time", "_hoodie_commit_seqno", "_hoodie_record_key", "_hoodie_partition_path", "_hoodie_file_name").collect()
338+
checkAnswer(result1)(Seq(1, "a1", 10.0, 1000))
339+
340+
val result2 = spark.sql(s"select * from ${tableName} timestamp as of $instant2 order by id")
341+
.drop("_hoodie_commit_time", "_hoodie_commit_seqno", "_hoodie_record_key", "_hoodie_partition_path", "_hoodie_file_name").collect()
342+
checkAnswer(result2)(
343+
Seq(1, "a1", 10.0, 1000, null),
344+
Seq(2, "a2", 11.0, 1100, "hudi")
345+
)
346+
347+
val result3 = spark.sql(s"select * from ${tableName} order by id")
348+
.drop("_hoodie_commit_time", "_hoodie_commit_seqno", "_hoodie_record_key", "_hoodie_partition_path", "_hoodie_file_name").collect()
349+
checkAnswer(result3)(
350+
Seq(1, "a1", 1000, null),
351+
Seq(2, "a2", 1100, "hudi")
352+
)
353+
}
354+
}
355+
}
297356
}

0 commit comments

Comments
 (0)