@@ -32,7 +32,7 @@ import org.apache.hudi.avro.HoodieAvroUtils
3232import org .apache .hudi .common .config .{HoodieMetadataConfig , TypedProperties }
3333import org .apache .hudi .common .model ._
3434import org .apache .hudi .common .table .HoodieTableMetaClient
35- import org .apache .hudi .common .table .cdc .HoodieCDCInferCase ._
35+ import org .apache .hudi .common .table .cdc .HoodieCDCInferenceCase ._
3636import org .apache .hudi .common .table .cdc .HoodieCDCOperation ._
3737import org .apache .hudi .common .table .cdc .HoodieCDCSupplementalLoggingMode ._
3838import org .apache .hudi .common .table .cdc .{HoodieCDCFileSplit , HoodieCDCUtils }
@@ -81,10 +81,11 @@ class HoodieCDCRDD(
8181 originTableSchema : HoodieTableSchema ,
8282 cdcSchema : StructType ,
8383 requiredCdcSchema : StructType ,
84- changes : Array [HoodieCDCFileGroupSplit ])
84+ @ transient changes : Array [HoodieCDCFileGroupSplit ])
8585 extends RDD [InternalRow ](spark.sparkContext, Nil ) with HoodieUnsafeRDD {
8686
87- @ transient private val hadoopConf = spark.sparkContext.hadoopConfiguration
87+ @ transient
88+ private val hadoopConf = spark.sparkContext.hadoopConfiguration
8889
8990 private val confBroadcast = spark.sparkContext.broadcast(new SerializableWritable (hadoopConf))
9091
@@ -118,7 +119,7 @@ class HoodieCDCRDD(
118119
119120 private lazy val fs = metaClient.getFs.getFileSystem
120121
121- private lazy val conf = new Configuration ( confBroadcast.value.value)
122+ private lazy val conf = confBroadcast.value.value
122123
123124 private lazy val basePath = metaClient.getBasePathV2
124125
@@ -127,11 +128,7 @@ class HoodieCDCRDD(
127128 private lazy val populateMetaFields = tableConfig.populateMetaFields()
128129
129130 private lazy val keyGenerator = {
130- val props = new TypedProperties ()
131- props.put(HoodieWriteConfig .KEYGENERATOR_CLASS_NAME .key, tableConfig.getKeyGeneratorClassName)
132- props.put(KeyGeneratorOptions .RECORDKEY_FIELD_NAME .key, tableConfig.getRecordKeyFieldProp)
133- props.put(KeyGeneratorOptions .PARTITIONPATH_FIELD_NAME .key, tableConfig.getPartitionFieldProp)
134- HoodieSparkKeyGeneratorFactory .createKeyGenerator(props)
131+ HoodieSparkKeyGeneratorFactory .createKeyGenerator(tableConfig.getProps())
135132 }
136133
137134 private lazy val recordKeyField : String = if (populateMetaFields) {
@@ -202,7 +199,7 @@ class HoodieCDCRDD(
202199 private var currentInstant : String = _
203200
204201 // The change file that is currently being processed
205- private var currentChangeFile : HoodieCDCFileSplit = _
202+ private var currentCDCFileSplit : HoodieCDCFileSplit = _
206203
207204 /**
208205 * Two cases will use this to iterator the records:
@@ -258,10 +255,10 @@ class HoodieCDCRDD(
258255 if (needLoadNextFile) {
259256 loadCdcFile()
260257 }
261- if (currentChangeFile == null ) {
258+ if (currentCDCFileSplit == null ) {
262259 false
263260 } else {
264- currentChangeFile .getCdcInferCase match {
261+ currentCDCFileSplit .getCdcInferCase match {
265262 case BASE_FILE_INSERT | BASE_FILE_DELETE | REPLACE_COMMIT =>
266263 if (recordIter.hasNext && loadNext()) {
267264 true
@@ -292,7 +289,7 @@ class HoodieCDCRDD(
292289
293290 def loadNext (): Boolean = {
294291 var loaded = false
295- currentChangeFile .getCdcInferCase match {
292+ currentCDCFileSplit .getCdcInferCase match {
296293 case BASE_FILE_INSERT =>
297294 val originRecord = recordIter.next()
298295 recordToLoad.update(3 , convertRowToJsonString(originRecord))
@@ -416,48 +413,48 @@ class HoodieCDCRDD(
416413 if (cdcFileIter.hasNext) {
417414 val split = cdcFileIter.next()
418415 currentInstant = split.getInstant
419- currentChangeFile = split
420- currentChangeFile .getCdcInferCase match {
416+ currentCDCFileSplit = split
417+ currentCDCFileSplit .getCdcInferCase match {
421418 case BASE_FILE_INSERT =>
422- assert(currentChangeFile .getCdcFiles != null && currentChangeFile .getCdcFiles.size() == 1 )
423- val absCDCPath = new Path (basePath, currentChangeFile .getCdcFiles.get(0 ))
419+ assert(currentCDCFileSplit .getCdcFiles != null && currentCDCFileSplit .getCdcFiles.size() == 1 )
420+ val absCDCPath = new Path (basePath, currentCDCFileSplit .getCdcFiles.get(0 ))
424421 val fileStatus = fs.getFileStatus(absCDCPath)
425422 val pf = PartitionedFile (InternalRow .empty, absCDCPath.toUri.toString, 0 , fileStatus.getLen)
426423 recordIter = parquetReader(pf)
427424 case BASE_FILE_DELETE =>
428- assert(currentChangeFile .getBeforeFileSlice.isPresent)
429- recordIter = loadFileSlice(currentChangeFile .getBeforeFileSlice.get)
425+ assert(currentCDCFileSplit .getBeforeFileSlice.isPresent)
426+ recordIter = loadFileSlice(currentCDCFileSplit .getBeforeFileSlice.get)
430427 case LOG_FILE =>
431- assert(currentChangeFile .getCdcFiles != null && currentChangeFile .getCdcFiles.size() == 1
432- && currentChangeFile .getBeforeFileSlice.isPresent)
433- loadBeforeFileSliceIfNeeded(currentChangeFile .getBeforeFileSlice.get)
434- val absLogPath = new Path (basePath, currentChangeFile .getCdcFiles.get(0 ))
428+ assert(currentCDCFileSplit .getCdcFiles != null && currentCDCFileSplit .getCdcFiles.size() == 1
429+ && currentCDCFileSplit .getBeforeFileSlice.isPresent)
430+ loadBeforeFileSliceIfNeeded(currentCDCFileSplit .getBeforeFileSlice.get)
431+ val absLogPath = new Path (basePath, currentCDCFileSplit .getCdcFiles.get(0 ))
435432 val morSplit = HoodieMergeOnReadFileSplit (None , List (new HoodieLogFile (fs.getFileStatus(absLogPath))))
436433 val logFileIterator = new LogFileIterator (morSplit, originTableSchema, originTableSchema, tableState, conf)
437434 logRecordIter = logFileIterator.logRecordsPairIterator
438435 case AS_IS =>
439- assert(currentChangeFile .getCdcFiles != null && ! currentChangeFile .getCdcFiles.isEmpty)
436+ assert(currentCDCFileSplit .getCdcFiles != null && ! currentCDCFileSplit .getCdcFiles.isEmpty)
440437 // load beforeFileSlice to beforeImageRecords
441- if (currentChangeFile .getBeforeFileSlice.isPresent) {
442- loadBeforeFileSliceIfNeeded(currentChangeFile .getBeforeFileSlice.get)
438+ if (currentCDCFileSplit .getBeforeFileSlice.isPresent) {
439+ loadBeforeFileSliceIfNeeded(currentCDCFileSplit .getBeforeFileSlice.get)
443440 }
444441 // load afterFileSlice to afterImageRecords
445- if (currentChangeFile .getAfterFileSlice.isPresent) {
446- val iter = loadFileSlice(currentChangeFile .getAfterFileSlice.get())
442+ if (currentCDCFileSplit .getAfterFileSlice.isPresent) {
443+ val iter = loadFileSlice(currentCDCFileSplit .getAfterFileSlice.get())
447444 afterImageRecords = mutable.Map .empty
448445 iter.foreach { row =>
449446 val key = getRecordKey(row)
450447 afterImageRecords.put(key, row.copy())
451448 }
452449 }
453450
454- val cdcLogFiles = currentChangeFile .getCdcFiles.asScala.map { cdcFile =>
451+ val cdcLogFiles = currentCDCFileSplit .getCdcFiles.asScala.map { cdcFile =>
455452 new HoodieLogFile (fs.getFileStatus(new Path (basePath, cdcFile)))
456453 }.toArray
457454 cdcLogRecordIterator = new HoodieCDCLogRecordIterator (fs, cdcLogFiles, cdcAvroSchema)
458455 case REPLACE_COMMIT =>
459- if (currentChangeFile .getBeforeFileSlice.isPresent) {
460- loadBeforeFileSliceIfNeeded(currentChangeFile .getBeforeFileSlice.get)
456+ if (currentCDCFileSplit .getBeforeFileSlice.isPresent) {
457+ loadBeforeFileSliceIfNeeded(currentCDCFileSplit .getBeforeFileSlice.get)
461458 }
462459 recordIter = beforeImageRecords.values.map { record =>
463460 deserialize(record)
@@ -467,15 +464,15 @@ class HoodieCDCRDD(
467464 resetRecordFormat()
468465 } else {
469466 currentInstant = null
470- currentChangeFile = null
467+ currentCDCFileSplit = null
471468 }
472469 }
473470
474471 /**
475472 * Initialize the partial fields of the data to be returned in advance to speed up.
476473 */
477474 private def resetRecordFormat (): Unit = {
478- recordToLoad = currentChangeFile .getCdcInferCase match {
475+ recordToLoad = currentCDCFileSplit .getCdcInferCase match {
479476 case BASE_FILE_INSERT =>
480477 InternalRow .fromSeq(Array (
481478 CDCRelation .CDC_OPERATION_INSERT , convertToUTF8String(currentInstant),
0 commit comments