-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-48772][SS][SQL] State Data Source Change Feed Reader Mode #47188
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
1ade442
98bf8ec
fb890ae
db45c6f
1926e5e
24c0351
d4a4b80
42552ac
24db837
adde991
d3ca86c
5199c56
ce75133
84dcf15
22a086b
c797d0b
5921479
e5674cf
c012e1a
ff0cd43
2ad7590
43420f6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -471,12 +471,17 @@ trait SupportsFineGrainedReplay { | |
| } | ||
|
|
||
| /** | ||
| * Return a [[StateStoreChangeDataReader]] that reads the changelogs entries from startVersion to | ||
| * Return an iterator that reads all the entries of changelogs from startVersion to | ||
| * endVersion. | ||
| * Each record is represented by a tuple of (recordType: [[RecordType.Value]], key: [[UnsafeRow]], | ||
| * value: [[UnsafeRow]], batchId: [[Long]]) | ||
| * A put record is returned as a tuple(recordType, key, value, batchId) | ||
| * A delete record is return as a tuple(recordType, key, null, batchId) | ||
| * | ||
| * @param startVersion starting changelog version | ||
| * @param endVersion ending changelog version | ||
| * @return | ||
| * @return tuple(recordType: [[RecordType.Value]], nested key: [[UnsafeRow]], | ||
| * nested value: [[UnsafeRow]], batchId: [[Long]]) | ||
| */ | ||
| def getStateStoreChangeDataReader(startVersion: Long, endVersion: Long): | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Strictly saying, 3rd party state store providers can implement their own format of delta/changelog files. We need to define an interface for change data reader, and have a built-in implementation of the interface which works for both HDFS and RocksDB.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think users can extend
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But StateStoreChangeDataReader is only helpful when they have very similar implementation with the built-in one, right? If they have totally different approach of maintaining changelog, they are going to reimplement everything and it is not clear what needs to be implemented. An interface is to declare the spec. Whenever we design pluggable functionality, please be sure to define the spec and describe the spec as interface. Don't make others struggle with understanding spec from actual implementation.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see your point now. I would say to make the interface easier for users to implement, why don't we use the superclass of
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry one last small suggestion - Could we please put the information of tuple in the method doc? Now it's not self-explained. |
||
| NextIterator[(RecordType.Value, UnsafeRow, UnsafeRow, Long)] | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -403,7 +403,7 @@ class StateStoreChangelogReaderV2( | |
|
|
||
| /** | ||
| * Base class representing a iterator that iterates over a range of changelog files in a state | ||
| * store. In each iteration, it will return a tuple of (changeType: [[RecordType]], | ||
| * store. In each iteration, it will return a ByteArrayPair of (changeType: [[RecordType]], | ||
|
||
| * nested key: [[UnsafeRow]], nested value: [[UnsafeRow]], batchId: [[Long]]) | ||
| * | ||
| * @param fm checkpoint file manager used to manage streaming query checkpoint | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.