Skip to content

Commit 7c6cdad

Browse files
unify the two traits
1 parent cd6a39b commit 7c6cdad

4 files changed

Lines changed: 17 additions & 13 deletions

File tree

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,11 @@ class StateStoreChangeDataPartitionReader(
158158
schema: StructType) extends StatePartitionReader(storeConf, hadoopConf, partition, schema) {
159159

160160
private lazy val cdcReader: StateStoreChangeDataReader = {
161-
provider.asInstanceOf[SupportsStateStoreChangeDataFeed]
161+
if (!provider.isInstanceOf[SupportsFineGrainedReplay]) {
162+
throw StateStoreErrors.stateStoreProviderDoesNotSupportFineGrainedReplay(
163+
provider.getClass.toString)
164+
}
165+
provider.asInstanceOf[SupportsFineGrainedReplay]
162166
.getStateStoreChangeDataReader(
163167
partition.sourceOptions.cdcStartBatchID.get + 1,
164168
partition.sourceOptions.cdcEndBatchId.get + 1)

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -440,9 +440,9 @@ object StateStoreProvider {
440440
}
441441

442442
/**
443-
* This is an optional trait to be implemented by [[StateStoreProvider]]s that can read fine
444-
* grained state data which is replayed from a specific snapshot version. It is used by the
445-
* snapshotStartBatchId option in state data source.
443+
* This is an optional trait to be implemented by [[StateStoreProvider]]s that can read the change
444+
* of state store over batches. This is used by State Data Source with additional options like
445+
* snapshotStartBatchId or readChangeFeed.
446446
*/
447447
trait SupportsFineGrainedReplay {
448448
/**
@@ -469,6 +469,15 @@ trait SupportsFineGrainedReplay {
469469
def replayReadStateFromSnapshot(snapshotVersion: Long, endVersion: Long): ReadStateStore = {
470470
new WrappedReadStateStore(replayStateFromSnapshot(snapshotVersion, endVersion))
471471
}
472+
473+
/**
474+
*
475+
* @param startVersion
476+
* @param endVersion
477+
* @return
478+
*/
479+
def getStateStoreChangeDataReader(startVersion: Long, endVersion: Long):
480+
StateStoreChangeDataReader
472481
}
473482

474483
/**

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangeDataReader.scala

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,6 @@ import org.apache.spark.sql.execution.streaming.CheckpointFileManager
2626
import org.apache.spark.sql.types.StructType
2727
import org.apache.spark.util.NextIterator
2828

29-
/**
30-
* This is an optional trait for [[StateStoreProvider]]s to mix in if they support reading state
31-
* change data. It is used by the readChangeFeed option of State Data Source.
32-
*/
33-
trait SupportsStateStoreChangeDataFeed {
34-
def getStateStoreChangeDataReader(startVersion: Long, endVersion: Long):
35-
StateStoreChangeDataReader
36-
}
37-
3829
/**
3930
* Base class for state store changelog reader
4031
* @param fm - checkpoint file manager used to manage streaming query checkpoint

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceCDCReadSuite.scala renamed to sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceChangeDataReadSuite.scala

File renamed without changes.

0 commit comments

Comments
 (0)