Skip to content

Commit ccfaf01

Browse files
yihuacodope
authored andcommitted
[HUDI-4540] Cover different table types in functional tests of Spark structured streaming (#6317)
1 parent cd14c26 commit ccfaf01

1 file changed

Lines changed: 90 additions & 74 deletions

File tree

hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala

Lines changed: 90 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@
1818
package org.apache.hudi.functional
1919

2020
import org.apache.hadoop.fs.{FileSystem, Path}
21-
import org.apache.hudi.common.model.FileSlice
21+
import org.apache.hudi.common.model.{FileSlice, HoodieTableType}
2222
import org.apache.hudi.common.table.HoodieTableMetaClient
23+
import org.apache.hudi.common.table.timeline.HoodieTimeline
2324
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
2425
import org.apache.hudi.common.testutils.{HoodieTestDataGenerator, HoodieTestTable}
25-
import org.apache.hudi.config.{HoodieClusteringConfig, HoodieStorageConfig, HoodieWriteConfig}
26+
import org.apache.hudi.common.util.CollectionUtils
27+
import org.apache.hudi.config.{HoodieClusteringConfig, HoodieCompactionConfig, HoodieStorageConfig, HoodieWriteConfig}
2628
import org.apache.hudi.exception.TableNotFoundException
2729
import org.apache.hudi.testutils.HoodieClientTestBase
2830
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
@@ -31,7 +33,9 @@ import org.apache.spark.sql._
3133
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
3234
import org.apache.spark.sql.types.StructType
3335
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
34-
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
36+
import org.junit.jupiter.api.{AfterEach, BeforeEach}
37+
import org.junit.jupiter.params.ParameterizedTest
38+
import org.junit.jupiter.params.provider.{EnumSource, ValueSource}
3539

3640
import scala.collection.JavaConversions._
3741
import scala.concurrent.ExecutionContext.Implicits.global
@@ -99,8 +103,30 @@ class TestStructuredStreaming extends HoodieClientTestBase {
99103
(sourcePath, destPath)
100104
}
101105

102-
@Test
103-
def testStructuredStreaming(): Unit = {
106+
def getOptsWithTableType(tableType: HoodieTableType): Map[String, String] = {
107+
commonOpts + (DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name())
108+
}
109+
110+
def getClusteringOpts(tableType: HoodieTableType, isInlineClustering: String,
111+
isAsyncClustering: String, clusteringNumCommit: String,
112+
fileMaxRecordNum: Int): Map[String, String] = {
113+
getOptsWithTableType(tableType) + (
114+
HoodieClusteringConfig.INLINE_CLUSTERING.key -> isInlineClustering,
115+
HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS.key -> clusteringNumCommit,
116+
DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE.key -> isAsyncClustering,
117+
HoodieClusteringConfig.ASYNC_CLUSTERING_MAX_COMMITS.key -> clusteringNumCommit,
118+
HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key -> dataGen.getEstimatedFileSizeInBytes(fileMaxRecordNum).toString
119+
)
120+
}
121+
122+
def getCompactionOpts(tableType: HoodieTableType, isAsyncCompaction: Boolean): Map[String, String] = {
123+
getOptsWithTableType(tableType) + (
124+
DataSourceWriteOptions.ASYNC_COMPACT_ENABLE.key -> isAsyncCompaction.toString,
125+
HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key -> "1"
126+
)
127+
}
128+
129+
def structuredStreamingTestRunner(tableType: HoodieTableType, addCompactionConfigs: Boolean, isAsyncCompaction: Boolean): Unit = {
104130
val (sourcePath, destPath) = initStreamingSourceAndDestPath("source", "dest")
105131
// First chunk of data
106132
val records1 = recordsToStrings(dataGen.generateInserts("000", 100)).toList
@@ -111,7 +137,12 @@ class TestStructuredStreaming extends HoodieClientTestBase {
111137
val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
112138
val uniqueKeyCnt = inputDF2.select("_row_key").distinct().count()
113139

114-
val f1 = initStreamingWriteFuture(inputDF1.schema, sourcePath, destPath, commonOpts)
140+
val hudiOptions = if (addCompactionConfigs) {
141+
getCompactionOpts(tableType, isAsyncCompaction)
142+
} else {
143+
getOptsWithTableType(tableType)
144+
}
145+
val f1 = initStreamingWriteFuture(inputDF1.schema, sourcePath, destPath, hudiOptions)
115146

116147
val f2 = Future {
117148
inputDF1.coalesce(1).write.mode(SaveMode.Append).json(sourcePath)
@@ -125,16 +156,23 @@ class TestStructuredStreaming extends HoodieClientTestBase {
125156
assert(hoodieROViewDF1.count() == 100)
126157

127158
inputDF2.coalesce(1).write.mode(SaveMode.Append).json(sourcePath)
128-
// wait for spark streaming to process second microbatch
129-
waitTillAtleastNCommits(fs, destPath, currNumCommits + 1, 120, 5)
130-
val commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, destPath)
131-
assertEquals(2, HoodieDataSourceHelpers.listCommitsSince(fs, destPath, "000").size())
159+
// When the compaction configs are added, one more commit of the compaction is expected
160+
val numExpectedCommits = if (addCompactionConfigs) currNumCommits + 2 else currNumCommits + 1
161+
waitTillAtleastNCommits(fs, destPath, numExpectedCommits, 120, 5)
162+
163+
val commitInstantTime2 = if (tableType == HoodieTableType.MERGE_ON_READ) {
164+
// For the records that are processed by the compaction in MOR table
165+
// the "_hoodie_commit_time" still reflects the latest delta commit
166+
latestInstant(fs, destPath, HoodieTimeline.DELTA_COMMIT_ACTION)
167+
} else {
168+
HoodieDataSourceHelpers.latestCommit(fs, destPath)
169+
}
170+
assertEquals(numExpectedCommits, HoodieDataSourceHelpers.listCommitsSince(fs, destPath, "000").size())
132171
// Read RO View
133172
val hoodieROViewDF2 = spark.read.format("org.apache.hudi")
134173
.load(destPath + "/*/*/*/*")
135174
assertEquals(100, hoodieROViewDF2.count()) // still 100, since we only updated
136175

137-
138176
// Read Incremental View
139177
// we have 2 commits, try pulling the first commit (which is not the latest)
140178
val firstCommit = HoodieDataSourceHelpers.listCommitsSince(fs, destPath, "000").get(0)
@@ -163,6 +201,12 @@ class TestStructuredStreaming extends HoodieClientTestBase {
163201
Await.result(Future.sequence(Seq(f1, f2)), Duration.Inf)
164202
}
165203

204+
@ParameterizedTest
205+
@EnumSource(value = classOf[HoodieTableType])
206+
def testStructuredStreaming(tableType: HoodieTableType): Unit = {
207+
structuredStreamingTestRunner(tableType, false, false)
208+
}
209+
166210
@throws[InterruptedException]
167211
private def waitTillAtleastNCommits(fs: FileSystem, tablePath: String,
168212
numCommits: Int, timeoutSecs: Int, sleepSecsAfterEachRun: Int) = {
@@ -178,8 +222,6 @@ class TestStructuredStreaming extends HoodieClientTestBase {
178222
numInstants = timeline.countInstants
179223
success = true
180224
}
181-
val metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(tablePath)
182-
.setLoadActiveTimelineOnLoad(true).build()
183225
} catch {
184226
case te: TableNotFoundException =>
185227
log.info("Got table not found exception. Retrying")
@@ -193,61 +235,30 @@ class TestStructuredStreaming extends HoodieClientTestBase {
193235
numInstants
194236
}
195237

196-
def getClusteringOpts(isInlineClustering: String, isAsyncClustering: String, isAsyncCompaction: String,
197-
clusteringNumCommit: String, fileMaxRecordNum: Int):Map[String, String] = {
198-
commonOpts + (HoodieClusteringConfig.INLINE_CLUSTERING.key -> isInlineClustering,
199-
HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS.key -> clusteringNumCommit,
200-
DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE.key -> isAsyncClustering,
201-
DataSourceWriteOptions.ASYNC_COMPACT_ENABLE.key -> isAsyncCompaction,
202-
HoodieClusteringConfig.ASYNC_CLUSTERING_MAX_COMMITS.key -> clusteringNumCommit,
203-
HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key -> dataGen.getEstimatedFileSizeInBytes(fileMaxRecordNum).toString
204-
)
205-
}
206-
207-
@Test
208-
def testStructuredStreamingWithInlineClustering(): Unit = {
238+
@ParameterizedTest
239+
@ValueSource(booleans = Array(true, false))
240+
def testStructuredStreamingWithClustering(isAsyncClustering: Boolean): Unit = {
209241
val (sourcePath, destPath) = initStreamingSourceAndDestPath("source", "dest")
210242

211-
def checkClusteringResult(destPath: String):Unit = {
243+
def checkClusteringResult(destPath: String): Unit = {
212244
// check have schedule clustering and clustering file group to one
213245
waitTillHasCompletedReplaceInstant(destPath, 120, 1)
214246
metaClient.reloadActiveTimeline()
215247
assertEquals(1, getLatestFileGroupsFileId(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).size)
216248
}
217-
structuredStreamingForTestClusteringRunner(sourcePath, destPath, true, false, false,
218-
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, checkClusteringResult)
219-
}
220-
221-
@Test
222-
def testStructuredStreamingWithAsyncClustering(): Unit = {
223-
val (sourcePath, destPath) = initStreamingSourceAndDestPath("source", "dest")
224249

225-
def checkClusteringResult(destPath: String):Unit = {
226-
// check have schedule clustering and clustering file group to one
227-
waitTillHasCompletedReplaceInstant(destPath, 120, 1)
228-
metaClient.reloadActiveTimeline()
229-
assertEquals(1, getLatestFileGroupsFileId(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).size)
230-
}
231-
structuredStreamingForTestClusteringRunner(sourcePath, destPath, false, true, false,
232-
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, checkClusteringResult)
250+
structuredStreamingForTestClusteringRunner(sourcePath, destPath, HoodieTableType.COPY_ON_WRITE,
251+
!isAsyncClustering, isAsyncClustering, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, checkClusteringResult)
233252
}
234253

235-
@Test
236-
def testStructuredStreamingWithAsyncClusteringAndCompaction(): Unit = {
237-
val (sourcePath, destPath) = initStreamingSourceAndDestPath("source", "dest")
238-
239-
def checkClusteringResult(destPath: String):Unit = {
240-
// check have schedule clustering and clustering file group to one
241-
waitTillHasCompletedReplaceInstant(destPath, 120, 1)
242-
metaClient.reloadActiveTimeline()
243-
assertEquals(1, getLatestFileGroupsFileId(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).size)
244-
}
245-
structuredStreamingForTestClusteringRunner(sourcePath, destPath, false, true, true,
246-
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, checkClusteringResult)
254+
@ParameterizedTest
255+
@ValueSource(booleans = Array(true, false))
256+
def testStructuredStreamingWithCompaction(isAsyncCompaction: Boolean): Unit = {
257+
structuredStreamingTestRunner(HoodieTableType.MERGE_ON_READ, true, isAsyncCompaction)
247258
}
248259

249-
def structuredStreamingForTestClusteringRunner(sourcePath: String, destPath: String, isInlineClustering: Boolean,
250-
isAsyncClustering: Boolean, isAsyncCompaction: Boolean,
260+
def structuredStreamingForTestClusteringRunner(sourcePath: String, destPath: String, tableType: HoodieTableType,
261+
isInlineClustering: Boolean, isAsyncClustering: Boolean,
251262
partitionOfRecords: String, checkClusteringResult: String => Unit): Unit = {
252263
// First insert of data
253264
val records1 = recordsToStrings(dataGen.generateInsertsForPartition("000", 100, partitionOfRecords)).toList
@@ -257,8 +268,8 @@ class TestStructuredStreaming extends HoodieClientTestBase {
257268
val records2 = recordsToStrings(dataGen.generateInsertsForPartition("001", 100, partitionOfRecords)).toList
258269
val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
259270

260-
val hudiOptions = getClusteringOpts(isInlineClustering.toString, isAsyncClustering.toString,
261-
isAsyncCompaction.toString, "2", 100)
271+
val hudiOptions = getClusteringOpts(
272+
tableType, isInlineClustering.toString, isAsyncClustering.toString, "2", 100)
262273
val f1 = initStreamingWriteFuture(inputDF1.schema, sourcePath, destPath, hudiOptions)
263274

264275
val f2 = Future {
@@ -270,28 +281,24 @@ class TestStructuredStreaming extends HoodieClientTestBase {
270281
inputDF2.coalesce(1).write.mode(SaveMode.Append).json(sourcePath)
271282
// wait for spark streaming to process second microbatch
272283
currNumCommits = waitTillAtleastNCommits(fs, destPath, currNumCommits + 1, 120, 5)
273-
// for inline clustering, clustering may be complete along with 2nd commit
274-
if (HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, destPath).getCompletedReplaceTimeline.countInstants() > 0) {
275-
assertEquals(3, HoodieDataSourceHelpers.listCommitsSince(fs, destPath, "000").size())
276-
// check have at least one file group
277-
this.metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(destPath)
278-
.setLoadActiveTimelineOnLoad(true).build()
279-
assertTrue(getLatestFileGroupsFileId(partitionOfRecords).size > 0)
280-
} else {
281-
assertEquals(currNumCommits, HoodieDataSourceHelpers.listCommitsSince(fs, destPath, "000").size())
282-
// check have more than one file group
283-
this.metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(destPath)
284-
.setLoadActiveTimelineOnLoad(true).build()
285-
assertTrue(getLatestFileGroupsFileId(partitionOfRecords).size > 1)
286-
}
287284

288-
// check clustering result
285+
// Wait for the clustering to finish
286+
this.metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(destPath)
287+
.setLoadActiveTimelineOnLoad(true).build()
289288
checkClusteringResult(destPath)
290289

291-
// check data correct after clustering
290+
assertEquals(3, HoodieDataSourceHelpers.listCommitsSince(fs, destPath, "000").size())
291+
// Check have at least one file group
292+
assertTrue(getLatestFileGroupsFileId(partitionOfRecords).size > 0)
293+
294+
// Validate data after clustering
292295
val hoodieROViewDF2 = spark.read.format("org.apache.hudi")
293296
.load(destPath + "/*/*/*/*")
294297
assertEquals(200, hoodieROViewDF2.count())
298+
val countsPerCommit = hoodieROViewDF2.groupBy("_hoodie_commit_time").count().collect()
299+
assertEquals(2, countsPerCommit.length)
300+
val commitInstantTime2 = latestInstant(fs, destPath, HoodieTimeline.COMMIT_ACTION)
301+
assertEquals(commitInstantTime2, countsPerCommit.maxBy(row => row.getAs[String](0)).get(0))
295302
}
296303
Await.result(Future.sequence(Seq(f1, f2)), Duration.Inf)
297304
}
@@ -327,4 +334,13 @@ class TestStructuredStreaming extends HoodieClientTestBase {
327334
if (!success) throw new IllegalStateException("Timed-out waiting for completing replace instant appear in " + tablePath)
328335
}
329336

337+
private def latestInstant(fs: FileSystem, basePath: String, instantAction: String): String = {
338+
val metaClient = HoodieTableMetaClient.builder
339+
.setConf(fs.getConf).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build
340+
metaClient.getActiveTimeline
341+
.getTimelineOfActions(CollectionUtils.createSet(instantAction))
342+
.filterCompletedInstants
343+
.lastInstant
344+
.get.getTimestamp
345+
}
330346
}

0 commit comments

Comments
 (0)