Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
import org.apache.avro.Schema.Field;
import org.apache.avro.SchemaCompatibility;
import org.apache.avro.generic.IndexedRecord;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;

import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
Expand All @@ -53,8 +55,10 @@
import org.apache.hudi.io.storage.HoodieHFileReader;
import org.apache.hudi.io.storage.HoodieOrcReader;
import org.apache.hudi.util.Lazy;

import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.hadoop.ParquetFileReader;
Expand Down Expand Up @@ -138,6 +142,19 @@ public Schema getTableAvroSchema(boolean includeMetadataFields) throws Exception
return getTableAvroSchemaInternal(includeMetadataFields, Option.empty());
}

/**
* Fetches tables schema in Avro format as of the given instant
*
* @param timestamp as of which table's schema will be fetched
*/
public Schema getTableAvroSchema(String timestamp) throws Exception {
Option<HoodieInstant> instant = metaClient.getActiveTimeline().getCommitsTimeline()
.filterCompletedInstants()
.findInstantsBeforeOrEquals(timestamp)
.lastInstant();
return getTableAvroSchemaInternal(metaClient.getTableConfig().populateMetaFields(), instant);
}

/**
* Fetches tables schema in Avro format as of the given instant
*
Expand Down Expand Up @@ -496,6 +513,18 @@ public Option<InternalSchema> getTableInternalSchemaFromCommitMetadata() {
return timeline.lastInstant().flatMap(this::getTableInternalSchemaFromCommitMetadata);
}

/**
* Gets the InternalSchema for a hoodie table from the HoodieCommitMetadata of the instant.
*
* @return InternalSchema for this table
*/
public Option<InternalSchema> getTableInternalSchemaFromCommitMetadata(String timestamp) {
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline()
.filterCompletedInstants()
.findInstantsBeforeOrEquals(timestamp);
return timeline.lastInstant().flatMap(this::getTableInternalSchemaFromCommitMetadata);
}

/**
* Gets the InternalSchema for a hoodie table from the HoodieCommitMetadata of the instant.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ package org.apache.hudi

import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.hbase.io.hfile.CacheConfig
import org.apache.hadoop.mapred.JobConf

import org.apache.hudi.HoodieBaseRelation._
import org.apache.hudi.HoodieConversionUtils.toScalaOption
import org.apache.hudi.avro.HoodieAvroUtils
Expand All @@ -41,6 +43,7 @@ import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
import org.apache.hudi.internal.schema.{HoodieSchemaException, InternalSchema}
import org.apache.hudi.io.storage.HoodieHFileReader

import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
Expand All @@ -59,6 +62,7 @@ import org.apache.spark.unsafe.types.UTF8String

import java.net.URI
import java.util.Locale

import scala.collection.JavaConverters._
import scala.util.control.NonFatal
import scala.util.{Failure, Success, Try}
Expand Down Expand Up @@ -139,7 +143,10 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
val internalSchemaOpt = if (!isSchemaEvolutionEnabled) {
None
} else {
Try(schemaResolver.getTableInternalSchemaFromCommitMetadata) match {
Try {
specifiedQueryTimestamp.map(schemaResolver.getTableInternalSchemaFromCommitMetadata)
.getOrElse(schemaResolver.getTableInternalSchemaFromCommitMetadata)
} match {
case Success(internalSchemaOpt) => toScalaOption(internalSchemaOpt)
case Failure(e) =>
logWarning("Failed to fetch internal-schema from the table", e)
Expand All @@ -149,6 +156,8 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,

val avroSchema = internalSchemaOpt.map { is =>
AvroInternalSchemaConverter.convert(is, "schema")
} orElse {
specifiedQueryTimestamp.map(schemaResolver.getTableAvroSchema)
} orElse {
schemaSpec.map(convertToAvroSchema)
} getOrElse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@ package org.apache.hudi.functional

import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.model.HoodieTableType
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.keygen.{ComplexKeyGenerator, NonpartitionedKeyGenerator}
import org.apache.hudi.testutils.HoodieClientTestBase
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions}

import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}

import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull, assertNull, assertTrue}
import org.junit.jupiter.api.{AfterEach, BeforeEach}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.EnumSource
Expand All @@ -39,9 +42,8 @@ class TestTimeTravelQuery extends HoodieClientTestBase {
"hoodie.upsert.shuffle.parallelism" -> "4",
"hoodie.bulkinsert.shuffle.parallelism" -> "2",
"hoodie.delete.shuffle.parallelism" -> "1",
DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp",
DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id",
DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "version",
HoodieWriteConfig.TBL_NAME.key -> "hoodie_test"
)

Expand Down Expand Up @@ -72,8 +74,6 @@ class TestTimeTravelQuery extends HoodieClientTestBase {
df1.write.format("hudi")
.options(commonOpts)
.option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name())
.option(RECORDKEY_FIELD.key, "id")
.option(PRECOMBINE_FIELD.key, "version")
.option(PARTITIONPATH_FIELD.key, "")
.option(KEYGENERATOR_CLASS_NAME.key, classOf[NonpartitionedKeyGenerator].getName)
.mode(SaveMode.Overwrite)
Expand All @@ -86,8 +86,6 @@ class TestTimeTravelQuery extends HoodieClientTestBase {
df2.write.format("hudi")
.options(commonOpts)
.option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name())
.option(RECORDKEY_FIELD.key, "id")
.option(PRECOMBINE_FIELD.key, "version")
.option(PARTITIONPATH_FIELD.key, "")
.option(KEYGENERATOR_CLASS_NAME.key, classOf[NonpartitionedKeyGenerator].getName)
.mode(SaveMode.Append)
Expand All @@ -100,8 +98,6 @@ class TestTimeTravelQuery extends HoodieClientTestBase {
df3.write.format("hudi")
.options(commonOpts)
.option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name())
.option(RECORDKEY_FIELD.key, "id")
.option(PRECOMBINE_FIELD.key, "version")
.option(PARTITIONPATH_FIELD.key, "")
.option(KEYGENERATOR_CLASS_NAME.key, classOf[NonpartitionedKeyGenerator].getName)
.mode(SaveMode.Append)
Expand Down Expand Up @@ -228,4 +224,84 @@ class TestTimeTravelQuery extends HoodieClientTestBase {
val format = new SimpleDateFormat("yyyy-MM-dd")
format.format(date)
}

@ParameterizedTest
@EnumSource(value = classOf[HoodieTableType])
def testTimeTravelQueryWithSchemaEvolution(tableType: HoodieTableType): Unit = {
initMetaClient(tableType)
val _spark = spark
import _spark.implicits._

// First write
val df1 = Seq((1, "a1", 10, 1000)).toDF("id", "name", "value", "version")
df1.write.format("hudi")
.options(commonOpts)
.option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name())
.option(PARTITIONPATH_FIELD.key, "")
.mode(SaveMode.Overwrite)
.save(basePath)

metaClient = HoodieTableMetaClient.builder()
.setBasePath(basePath)
.setConf(spark.sessionState.newHadoopConf)
.build()
val firstCommit = metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp

// Second write
val df2 = Seq((1, "a1", 12, 1001, "2022")).toDF("id", "name", "value", "version", "year")
df2.write.format("hudi")
.options(commonOpts)
.option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name())
.option(PARTITIONPATH_FIELD.key, "")
.mode(SaveMode.Append)
.save(basePath)
metaClient.reloadActiveTimeline()
val secondCommit = metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp

// Third write
val df3 = Seq((1, "a1", 13, 1002, "2022", "08")).toDF("id", "name", "value", "version", "year", "month")
df3.write.format("hudi")
.options(commonOpts)
.option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name())
.option(PARTITIONPATH_FIELD.key, "")
.mode(SaveMode.Append)
.save(basePath)
metaClient.reloadActiveTimeline()
val thirdCommit = metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp

val tableSchemaResolver = new TableSchemaResolver(metaClient)

// Query as of firstCommitTime
val result1 = spark.read.format("hudi")
.option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key, firstCommit)
.load(basePath)
.select("id", "name", "value", "version")
.take(1)(0)
assertEquals(Row(1, "a1", 10, 1000), result1)
val schema1 = tableSchemaResolver.getTableAvroSchema(firstCommit)
assertNull(schema1.getField("year"))
assertNull(schema1.getField("month"))

// Query as of secondCommitTime
val result2 = spark.read.format("hudi")
.option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key, secondCommit)
.load(basePath)
.select("id", "name", "value", "version", "year")
.take(1)(0)
assertEquals(Row(1, "a1", 12, 1001, "2022"), result2)
val schema2 = tableSchemaResolver.getTableAvroSchema(secondCommit)
assertNotNull(schema2.getField("year"))
assertNull(schema2.getField("month"))

// Query as of thirdCommitTime
val result3 = spark.read.format("hudi")
.option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key, thirdCommit)
.load(basePath)
.select("id", "name", "value", "version", "year", "month")
.take(1)(0)
assertEquals(Row(1, "a1", 13, 1002, "2022", "08"), result3)
val schema3 = tableSchemaResolver.getTableAvroSchema(thirdCommit)
assertNotNull(schema3.getField("year"))
assertNotNull(schema3.getField("month"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -294,4 +294,63 @@ class TestTimeTravelTable extends HoodieSparkSqlTestBase {
}
}
}

test("Test Time Travel With Schema Evolution") {
if (HoodieSparkUtils.gteqSpark3_2) {
withTempDir { tmp =>
spark.sql("set hoodie.schema.on.read.enable=true")
val tableName = generateTableName
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| tblproperties (
| primaryKey = 'id',
| preCombineField = 'ts'
| )
| location '${tmp.getCanonicalPath}/$tableName'
""".stripMargin)

spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")

val metaClient = HoodieTableMetaClient.builder()
.setBasePath(s"${tmp.getCanonicalPath}/$tableName")
.setConf(spark.sessionState.newHadoopConf())
.build()
val instant1 = metaClient.reloadActiveTimeline().getAllCommitsTimeline
.lastInstant().get().getTimestamp

// add column
spark.sql(s"alter table $tableName add columns (company string)")
spark.sql(s"insert into $tableName values(2, 'a2', 11, 1100, 'hudi')")
val instant2 = metaClient.reloadActiveTimeline().getAllCommitsTimeline
.lastInstant().get().getTimestamp

// drop column
spark.sql(s"alter table $tableName drop column price")

val result1 = spark.sql(s"select * from ${tableName} timestamp as of $instant1 order by id")
.drop("_hoodie_commit_time", "_hoodie_commit_seqno", "_hoodie_record_key", "_hoodie_partition_path", "_hoodie_file_name").collect()
checkAnswer(result1)(Seq(1, "a1", 10.0, 1000))

val result2 = spark.sql(s"select * from ${tableName} timestamp as of $instant2 order by id")
.drop("_hoodie_commit_time", "_hoodie_commit_seqno", "_hoodie_record_key", "_hoodie_partition_path", "_hoodie_file_name").collect()
checkAnswer(result2)(
Seq(1, "a1", 10.0, 1000, null),
Seq(2, "a2", 11.0, 1100, "hudi")
)

val result3 = spark.sql(s"select * from ${tableName} order by id")
.drop("_hoodie_commit_time", "_hoodie_commit_seqno", "_hoodie_record_key", "_hoodie_partition_path", "_hoodie_file_name").collect()
checkAnswer(result3)(
Seq(1, "a1", 1000, null),
Seq(2, "a2", 1100, "hudi")
)
}
}
}
}