Skip to content

Commit 378a375

Browse files
author
Alexey Kudinkin
committed
Extracted TestBasicSchemaEvolution as standalone test
1 parent 6ebbcc9 commit 378a375

3 files changed

Lines changed: 321 additions & 188 deletions

File tree

hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/ScalaAssertionSupport.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ trait ScalaAssertionSupport {
3131
return t.asInstanceOf[T]
3232
// scalastyle:on return
3333
case ot @ _ =>
34-
fail(s"Expected exception of class $expectedExceptionClass, but ${ot.getClass} has been thrown")
34+
fail(s"Expected exception of class $expectedExceptionClass, but ${ot.getClass} has been thrown: $ot\n${ot.getStackTrace.mkString("\n")}")
3535
}
3636

3737
fail(s"Expected exception of class $expectedExceptionClass, but nothing has been thrown")
Lines changed: 319 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,319 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.hudi.functional
19+
20+
import org.apache.hadoop.fs.FileSystem
21+
import org.apache.hudi.HoodieConversionUtils.toJavaOption
22+
import org.apache.hudi.common.model.HoodieRecord
23+
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver}
24+
import org.apache.hudi.common.util
25+
import org.apache.hudi.config.HoodieWriteConfig
26+
import org.apache.hudi.exception.{HoodieUpsertException, SchemaCompatibilityException}
27+
import org.apache.hudi.functional.TestBasicSchemaEvolution.{dropColumn, injectColumnAt}
28+
import org.apache.hudi.testutils.HoodieClientTestBase
29+
import org.apache.hudi.util.JFunction
30+
import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions, ScalaAssertionSupport}
31+
import org.apache.spark.sql.hudi.HoodieSparkSessionExtension
32+
import org.apache.spark.sql.types.{DateType, IntegerType, StringType, StructField, StructType}
33+
import org.apache.spark.sql.{HoodieUnsafeUtils, Row, SaveMode, SparkSession, SparkSessionExtensions}
34+
import org.junit.jupiter.api.Assertions.assertEquals
35+
import org.junit.jupiter.api.{AfterEach, BeforeEach}
36+
import org.junit.jupiter.params.ParameterizedTest
37+
import org.junit.jupiter.params.provider.CsvSource
38+
39+
import java.util.function.Consumer
40+
import scala.collection.JavaConverters._
41+
import scala.collection.convert.ImplicitConversions.`collection AsScalaIterable`
42+
43+
class TestBasicSchemaEvolution extends HoodieClientTestBase with ScalaAssertionSupport {
44+
45+
var spark: SparkSession = null
46+
val commonOpts = Map(
47+
"hoodie.insert.shuffle.parallelism" -> "4",
48+
"hoodie.upsert.shuffle.parallelism" -> "4",
49+
"hoodie.bulkinsert.shuffle.parallelism" -> "2",
50+
"hoodie.delete.shuffle.parallelism" -> "1",
51+
HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key() -> "true",
52+
DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
53+
DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
54+
DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp",
55+
HoodieWriteConfig.TBL_NAME.key -> "hoodie_test"
56+
)
57+
58+
val verificationCol: String = "driver"
59+
val updatedVerificationVal: String = "driver_update"
60+
61+
override def getSparkSessionExtensionsInjector: util.Option[Consumer[SparkSessionExtensions]] =
62+
toJavaOption(
63+
Some(
64+
JFunction.toJava((receiver: SparkSessionExtensions) => new HoodieSparkSessionExtension().apply(receiver)))
65+
)
66+
67+
@BeforeEach override def setUp() {
68+
initPath()
69+
initSparkContexts()
70+
spark = sqlContext.sparkSession
71+
initTestDataGenerator()
72+
initFileSystem()
73+
}
74+
75+
@AfterEach override def tearDown() = {
76+
cleanupSparkContexts()
77+
cleanupTestDataGenerator()
78+
cleanupFileSystem()
79+
FileSystem.closeAll()
80+
System.gc()
81+
}
82+
83+
@ParameterizedTest
84+
@CsvSource(value = Array(
85+
"bulk_insert,true", "bulk_insert,false",
86+
"insert,true", "insert,false",
87+
"upsert,true", "upsert,false"
88+
))
89+
def testBasicSchemaEvolution(opType: String, shouldReconcileSchema: Boolean): Unit = {
90+
// open the schema validate
91+
val opts = commonOpts ++
92+
Map(
93+
HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.key -> "true",
94+
DataSourceWriteOptions.RECONCILE_SCHEMA.key -> shouldReconcileSchema.toString,
95+
DataSourceWriteOptions.OPERATION.key -> opType
96+
)
97+
98+
def appendData(schema: StructType, batch: Seq[Row]): Unit = {
99+
HoodieUnsafeUtils.createDataFrameFromRows(spark, batch, schema)
100+
.write
101+
.format("org.apache.hudi")
102+
.options(opts)
103+
.mode(SaveMode.Append)
104+
.save(basePath)
105+
}
106+
107+
def loadTable: (StructType, Seq[Row]) = {
108+
val tableMetaClient = HoodieTableMetaClient.builder()
109+
.setConf(spark.sparkContext.hadoopConfiguration)
110+
.setBasePath(basePath)
111+
.build()
112+
113+
tableMetaClient.reloadActiveTimeline()
114+
115+
val resolver = new TableSchemaResolver(tableMetaClient)
116+
val latestTableSchema = AvroConversionUtils.convertAvroSchemaToStructType(resolver.getTableAvroSchema(false))
117+
118+
val df =
119+
spark.read.format("org.apache.hudi")
120+
.load(basePath + "/*/*")
121+
.drop(HoodieRecord.HOODIE_META_COLUMNS.asScala: _*)
122+
.orderBy("_row_key")
123+
124+
(latestTableSchema, df.collectAsList().toSeq)
125+
}
126+
127+
//
128+
// 1. Write 1st batch with schema A
129+
//
130+
131+
val firstSchema = StructType(
132+
StructField("_row_key", StringType, nullable = true) ::
133+
StructField("first_name", StringType, nullable = false) ::
134+
StructField("last_name", StringType, nullable = true) ::
135+
StructField("timestamp", IntegerType, nullable = true) ::
136+
StructField("partition", IntegerType, nullable = true) :: Nil)
137+
138+
val firstBatch = Seq(
139+
Row("1", "Andy", "Cooper", 1, 1),
140+
Row("2", "Lisi", "Wallace", 1, 1),
141+
Row("3", "Zhangsan", "Shu", 1, 1))
142+
143+
HoodieUnsafeUtils.createDataFrameFromRows(spark, firstBatch, firstSchema)
144+
.write
145+
.format("org.apache.hudi")
146+
.options(opts)
147+
.mode(SaveMode.Overwrite)
148+
.save(basePath)
149+
150+
//
151+
// 2. Write 2d batch with another schema (added column `age`)
152+
//
153+
154+
val secondSchema = StructType(
155+
StructField("_row_key", StringType, nullable = true) ::
156+
StructField("first_name", StringType, nullable = false) ::
157+
StructField("last_name", StringType, nullable = true) ::
158+
StructField("age", StringType, nullable = true) ::
159+
StructField("timestamp", IntegerType, nullable = true) ::
160+
StructField("partition", IntegerType, nullable = true) :: Nil)
161+
162+
val secondBatch = Seq(
163+
Row("4", "John", "Green", "10", 1, 1),
164+
Row("5", "Jack", "Sparrow", "13", 1, 1),
165+
Row("6", "Jill", "Fiorella", "12", 1, 1))
166+
167+
appendData(secondSchema, secondBatch)
168+
val (tableSchemaAfterSecondBatch, rowsAfterSecondBatch) = loadTable
169+
170+
// NOTE: In case schema reconciliation is ENABLED, Hudi would prefer the table's schema over the new batch
171+
// schema, therefore table's schema after commit will actually stay the same, shedding (newly added) columns
172+
// from the records that are present in the batch schema, but not in the table's one.
173+
//
174+
// In case schema reconciliation is DISABLED, table will be overwritten in the batch's schema,
175+
// entailing that the data in the added columns for table's existing records will be added w/ nulls,
176+
// in case new column is nullable, and would fail otherwise
177+
if (shouldReconcileSchema) {
178+
assertEquals(firstSchema, tableSchemaAfterSecondBatch)
179+
180+
val ageColOrd = secondSchema.indexWhere(_.name == "age")
181+
val expectedRows = firstBatch ++ dropColumn(secondBatch, ageColOrd)
182+
183+
assertEquals(expectedRows, rowsAfterSecondBatch)
184+
} else {
185+
assertEquals(secondSchema, tableSchemaAfterSecondBatch)
186+
187+
val ageColOrd = secondSchema.indexWhere(_.name == "age")
188+
val expectedRows = injectColumnAt(firstBatch, ageColOrd, null) ++ secondBatch
189+
190+
assertEquals(expectedRows, rowsAfterSecondBatch)
191+
}
192+
193+
//
194+
// 3. Write 3d batch with another schema (w/ omitted a _nullable_ column `second_name`, expected to succeed)
195+
//
196+
197+
val thirdSchema = StructType(
198+
StructField("_row_key", StringType, nullable = true) ::
199+
StructField("first_name", StringType, nullable = false) ::
200+
StructField("age", StringType, nullable = true) ::
201+
StructField("timestamp", IntegerType, nullable = true) ::
202+
StructField("partition", IntegerType, nullable = true) :: Nil)
203+
204+
val thirdBatch = Seq(
205+
Row("7", "Harry", "15", 1, 1),
206+
Row("8", "Ron", "14", 1, 1),
207+
Row("9", "Germiona", "16", 1, 1))
208+
209+
appendData(thirdSchema, thirdBatch)
210+
val (tableSchemaAfterThirdBatch, rowsAfterThirdBatch) = loadTable
211+
212+
// NOTE: In case schema reconciliation is ENABLED, Hudi would prefer the table's schema over the new batch
213+
// schema, therefore table's schema after commit will actually stay the same, adding back (dropped) columns
214+
// to the records in the batch (setting them as null).
215+
//
216+
// In case schema reconciliation is DISABLED, table will be overwritten in the batch's schema,
217+
// entailing that the data in the dropped columns for table's existing records will be dropped.
218+
if (shouldReconcileSchema) {
219+
assertEquals(firstSchema, tableSchemaAfterThirdBatch)
220+
221+
val ageColOrd = secondSchema.indexWhere(_.name == "age")
222+
val lastNameColOrd = firstSchema.indexWhere(_.name == "last_name")
223+
224+
val expectedRows = rowsAfterSecondBatch ++ dropColumn(injectColumnAt(thirdBatch, lastNameColOrd, null), ageColOrd)
225+
226+
assertEquals(expectedRows, rowsAfterThirdBatch)
227+
} else {
228+
assertEquals(thirdSchema, tableSchemaAfterThirdBatch)
229+
230+
val lastNameColOrd = secondSchema.indexWhere(_.name == "last_name")
231+
232+
val expectedRows =
233+
dropColumn(rowsAfterSecondBatch, lastNameColOrd) ++ thirdBatch
234+
235+
assertEquals(expectedRows, rowsAfterThirdBatch)
236+
}
237+
238+
//
239+
// 4. Write 4th batch with another schema (w/ omitted a _non-nullable_ column `first_name`, expected to fail
240+
// in case when schema reconciliation is enabled, expected to succeed otherwise)
241+
//
242+
243+
val fourthSchema = StructType(
244+
StructField("_row_key", StringType, nullable = true) ::
245+
StructField("age", StringType, nullable = true) ::
246+
StructField("timestamp", IntegerType, nullable = true) ::
247+
StructField("partition", IntegerType, nullable = true) :: Nil)
248+
249+
val fourthBatch = Seq(
250+
Row("10", "15", 1, 1),
251+
Row("11", "14", 1, 1),
252+
Row("12", "16", 1, 1))
253+
254+
// NOTE: In case schema reconciliation is ENABLED, Hudi would prefer the table's schema over the new batch
255+
// schema, therefore table's schema after commit will actually stay the same, adding back (dropped) columns
256+
// to the records in the batch. Since batch omits column that is designated as non-null, write is expected
257+
// to fail (being unable to set the missing column values to null).
258+
//
259+
// In case schema reconciliation is DISABLED, table will be overwritten in the batch's schema,
260+
// entailing that the data in the dropped columns for table's existing records will be dropped.
261+
if (shouldReconcileSchema) {
262+
assertThrows(classOf[SchemaCompatibilityException]) {
263+
appendData(fourthSchema, fourthBatch)
264+
}
265+
} else {
266+
appendData(thirdSchema, thirdBatch)
267+
val (latestTableSchema, rows) = loadTable
268+
269+
assertEquals(thirdSchema, latestTableSchema)
270+
271+
val firstNameColOrd = thirdSchema.indexWhere(_.name == "first_name")
272+
273+
val expectedRecords =
274+
dropColumn(rowsAfterThirdBatch, firstNameColOrd) ++ fourthBatch
275+
276+
assertEquals(expectedRecords, rows)
277+
}
278+
279+
280+
//
281+
// 5. Write 5th batch with another schema (w/ data-type changed for a column `timestamp`, expected to fail)
282+
//
283+
284+
val fifthSchema = StructType(
285+
StructField("_row_key", StringType, nullable = true) ::
286+
StructField("age", StringType, nullable = true) ::
287+
StructField("timestamp", StringType, nullable = true) ::
288+
StructField("partition", IntegerType, nullable = true) :: Nil)
289+
290+
val fifthBatch = Seq(
291+
Row("10", "15", "1", 1),
292+
Row("11", "14", "1", 1),
293+
Row("12", "16", "1", 1))
294+
295+
// NOTE: Expected to fail in both cases, as such transformation is not permitted
296+
assertThrows(classOf[SchemaCompatibilityException]) {
297+
appendData(fifthSchema, fifthBatch)
298+
}
299+
}
300+
}
301+
302+
object TestBasicSchemaEvolution {
303+
304+
def dropColumn(rows: Seq[Row], idx: Int): Seq[Row] =
305+
rows.map { r =>
306+
val values = r.toSeq.zipWithIndex
307+
.filterNot { case (_, cidx) => cidx == idx }
308+
.map { case (c, _) => c }
309+
Row(values: _*)
310+
}
311+
312+
def injectColumnAt(rows: Seq[Row], idx: Int, value: Any): Seq[Row] =
313+
rows.map { r =>
314+
val (left, right) = r.toSeq.splitAt(idx)
315+
val values = (left :+ value) ++ right
316+
Row(values: _*)
317+
}
318+
319+
}

0 commit comments

Comments
 (0)