Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
File renamed without changes
File renamed without changes
202 changes: 135 additions & 67 deletions rfc/rfc-51/rfc-51.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
- @Raymond
- @Vinoth
- @Danny
- @Prasanna

# Statue
JIRA: [https://issues.apache.org/jira/browse/HUDI-3478](https://issues.apache.org/jira/browse/HUDI-3478)
Expand All @@ -42,11 +43,11 @@ In cases where Hudi tables used as streaming sources, we want to be aware of all

To implement this feature, we need to implement the logic on the write and read path to let Hudi figure out the changed data when read. In some cases, we need to write extra data to help optimize CDC queries.

## Scenarios
## Scenario Illustration

Here is a simple case to explain the CDC.
The diagram below illustrates a typical CDC scenario.

![](scenario-definition.jpg)
![](scenario-illustration.jpg)

We follow the debezium output format: four columns as shown below

Expand All @@ -62,73 +63,79 @@ We follow the debezium output format: four columns as shown below
- u: represent `update`; when `op` is `u`, both `before` and `after` don't be null;
- d: represent `delete`; when `op` is `d`, `after` is always null;

Note: the illustration here ignores all the Hudi metadata columns like `_hoodie_commit_time` in `before` and `after` columns.
**Note**

## Goals
* In case of the same record having operations like insert -> delete -> insert, CDC data should be produced to reflect the exact behaviors.
* The illustration above ignores all the Hudi metadata columns like `_hoodie_commit_time` in `before` and `after` columns.

1. Support row-level CDC records generation and persistence;
2. Support both MOR and COW tables;
3. Support all the write operations;
4. Support Spark DataFrame/SQL/Streaming Query;
## Design Goals

## Implementation
1. Support row-level CDC records generation and persistence
2. Support both MOR and COW tables
3. Support all the write operations
4. Support incremental queries in CDC format across supported engines
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's also explicitly call out that:

  • For CDC-enabled Tables performance of non-CDC queries should not be affected

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


### CDC Architecture
## Configurations

![](arch.jpg)
| key | default | description |
|-----------------------------------------------------|----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| hoodie.table.cdc.enabled | `false` | The master switch of the CDC features. If `true`, writers and readers will respect CDC configurations and behave accordingly. |
| hoodie.table.cdc.supplemental.logging.mode | `KEY_OP` | A mode to indicate the level of changed data being persisted. At the minimum level, `KEY_OP` indicates changed records' keys and operations to be persisted. `DATA_BEFORE`: persist records' before-images in addition to `KEY_OP`. `DATA_BEFORE_AFTER`: persist records' after-images in addition to `DATA_BEFORE`. |

Note: Table operations like `Compact`, `Clean`, `Index` do not write/change any data. So we don't need to consider them in CDC scenario.

### Modifiying code paths
To perform CDC queries, users need to set `hoodie.datasource.query.incremental.format=cdc` and `hoodie.datasource.query.type=incremental`.

![](points.jpg)
| key | default | description |
|--------------------------------------------|----------------|--------------------------------------------------------------------------------------------------------------------------------------|
| hoodie.datasource.query.type | `snapshot` | set to `incremental` for incremental query. |
| hoodie.datasource.query.incremental.format | `latest_state` | `latest_state` (current incremental query behavior) returns the latest records' values. Set to `cdc` to return the full CDC results. |
| hoodie.datasource.read.begin.instanttime | - | requried. |
| hoodie.datasource.read.end.instanttime | - | optional. |

### Config Definitions
### Logical File Types

Define a new config:

| key | default | description |
| --- | --- | --- |
| hoodie.table.cdc.enabled | false | `true` represents the table to be used for CDC queries and will write cdc data if needed. |
| hoodie.table.cdc.supplemental.logging | true | If true, persist all the required information about the change data, including 'before' and 'after'. Otherwise, just persist the 'op' and the record key. |

Other existing config that can be reused in cdc mode is as following:
Define another query mode named `cdc`, which is similar to `snapshpt`, `read_optimized` and `incremental`.
When read in cdc mode, set `hoodie.datasource.query.type` to `cdc`.

| key | default | description |
| --- |---| --- |
| hoodie.datasource.query.type | snapshot | set to cdc, enable the cdc quey mode |
| hoodie.datasource.read.start.timestamp | - | requried. |
| hoodie.datasource.read.end.timestamp | - | optional. |


### CDC File Types

Here we define 5 cdc file types in CDC scenario.
We define 4 logical file types for the CDC scenario.

- CDC_LOG_File: a file consists of CDC Blocks with the changing data related to one commit.
- when `hoodie.table.cdc.supplemental.logging` is true, it keeps all the fields about the change data, including `op`, `ts_ms`, `before` and `after`. When query hudi table in cdc query mode, load this file and return directly.
- when `hoodie.table.cdc.supplemental.logging` is false, it just keeps the `op` and the key of the changing record. When query hudi table in cdc query mode, we need to load the previous version and the current one of the touched file slice to extract the other info like `before` and `after` on the fly.
- For COW tables, this file type refers to newly written log files alongside base files. The log files in this case only contain CDC info.
- For MOR tables, this file type refers to the typical log files in MOR tables. CDC info will be persisted as log blocks in the log files.
- ADD_BASE_File: a normal base file for a specified instant and a specified file group. All the data in this file are new-incoming. For example, we first write data to a new file group. So we can load this file, treat each record in this as the value of `after`, and the value of `op` of each record is `i`.
- REMOVE_BASE_FILE: a normal base file for a specified instant and a specified file group, but this file is empty. A file like this will be generated when we delete all the data in a file group. So we need to find the previous version of the file group, load it, treat each record in this as the value of `before`, and the value of `op` of each record is `d`.
- MOR_LOG_FILE: a normal log file. For this type, we need to load the previous version of file slice, and merge each record in the log file with this data loaded separately to determine how the record has changed, and get the value of `before` and `after`.
- REPLACED_FILE_GROUP: a file group that be replaced totally, like `DELETE_PARTITION` and `INSERT_OVERWRITE` operations. We load this file group, treat all the records as the value of `before`, and the value of `op` of each record is `d`.

Note:

- **Only `CDC_LOG_File` is a new file type and written out by CDC**. The `ADD_BASE_File`, `REMOVE_BASE_FILE`, `MOR_LOG_FILE` and `REPLACED_FILE_GROUP` are just representations of the existing data files in the CDC scenario. For some examples:
- `INSERT` operation will maybe create a list of new data files. These files will be treated as ADD_BASE_FILE;
- `DELETE_PARTITION` operation will replace a list of file slice. For each of these, we get the cdc data in the `REPLACED_FILE_GROUP` way.
**`CDC_LOG_File` is a new file type and written out for CDC**. `ADD_BASE_File`, `REMOVE_BASE_FILE`, and `REPLACED_FILE_GROUP` represent the existing data files in the CDC scenario.

### Write
For examples:
- `INSERT` operation will maybe create a list of new data files. These files will be treated as ADD_BASE_FILE;
- `DELETE_PARTITION` operation will replace a list of file slice. For each of these, we get the cdc data in the `REPLACED_FILE_GROUP` way.

## When `supplemental.logging.mode=KEY_OP`

In this mode, we minimized the additional storage for CDC information.

The idea is to **Write CDC files as little as possible, and reuse data files as much as possible**.
- When write, only the change type `op`s and record keys are persisted.
- When read, changed info will be inferred on-the-fly, which costs more computation power. As `op`s and record keys are
available, inference using current and previous committed data will be optimized by reducing IO cost of reading
previous committed data, i.e., only read changed records.

Hudi writes data by `HoodieWriteHandle`.
We notice that only `HoodieMergeHandle` and it's subclasses will receive both the old record and the new-coming record at the same time, merge and write.
So we will add a `LogFormatWriter` in these classes. If there is CDC data need to be written out, then call this writer to write out a log file which consist of `CDCBlock`.
The CDC log file will be placed in the same position as the base files and other log files, so that the clean service can clean up them without extra work. The file structure is like:
The detailed logical flows for write and read scenarios are the same regardless of `logging.mode`, which will be
illustrated in the section below.

## When `supplemental.logging.mode=DATA_BEFORE` or `DATA_BEFORE_AFTER`

Overall logic flows are illustrated below.

![](logic-flows.jpg)

### Write

Hudi writes data by `HoodieWriteHandle`. We notice that only `HoodieMergeHandle` and its subclasses will receive both
the old record and the new-coming record at the same time, merge and write. So we will add a `LogFormatWriter` in these
classes. If there is CDC data need to be written out, then call this writer to write out a log file which consist
of `CDCBlock`. The CDC log file will be placed in the same position as the base files and other log files, so that the
clean service can clean up them without extra work. The file structure is like:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIU CDCBlocks will be persisted w/in the same log-files as data-blocks for MOR, right?

I don't think we can do that as this will have serious performance implications for MOR tables that will have to skip over CDCBlocks for pure non-CDC queries

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alexeykudinkin because cdc log block can be skipped by extracting some indicator bytes of data, we think that it is minimal impact to normal read and so mix it with data log block, to stay consistent with data files' lifecycle like cleaning. Chatted with @YannByron, who will look into different name scheme (e.g. .cdc.log) and how cleaner should be tweaked for it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood. I appreciate the intent, but i think we should prioritize querying performance over how much we need to tweak cleaner to support it.

Problem w/ existing approach is that we can't cleanly separate out CDC/Data log-files w/o having separate naming schemes for them -- even though, we might be creating a new CDC log file for every write we do, whenever we do Delta Commit (on HDFS) we will be appending the log-blocks to the latest log-file present w/o checking whether it's CDC or Data log-file.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

discussed with @YannByron that we can have a type of log file with -cdc suffix. updated this part.


```
hudi_cdc_table/
Expand All @@ -148,20 +155,46 @@ hudi_cdc_table/

Under a partition directory, the `.log` file with `CDCBlock` above will keep the changing data we have to materialize.

There is an option to control what data is written to `CDCBlock`, that is `hoodie.table.cdc.supplemental.logging`. See the description of this config above.
#### Persisting CDC in MOR: Write-on-indexing vs Write-on-compaction

2 design choices on when to persist CDC in MOR tables:

Write-on-indexing allows CDC info to be persisted at the earliest, however, in case of Flink writer or Bucket
indexing, `op` (I/U/D) data is not available at indexing.

Write-on-compaction can always persist CDC info and achieve standardization of implementation logic across engines,
however, some delays are added to the CDC query results. Based on the business requirements, Log Compaction (RFC-48) or
scheduling more frequent compaction can be used to minimize the latency.

Spark DataSource example:
The semantics we propose to establish are: when base files are written, the corresponding CDC data is also persisted.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think i understand the proposal here: are we saying that we're not going to be producing the CDC records until compaction is performed?

I think this is a serious consistency gap: after Delta Commit records are already persisted and are visible to queries, hence someone can actually read the table and see these records persisted but when issuing CDC query they won't see these records.

Can you please elaborate what is the challenge here? What's the issue w/ Flink writer? Why Bucket Index affects this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is meant for persisting CDC data in -cdc log files; it does not mean CDC result is not available to users. Users will get the freshest CDC via on-the-fly inference. The problem with flink or bucket index is op (I/U/D) is not available during delta commit hence we can't persist CDC at that point. That's why persisting CDC is deferred to compactor for standardization. I have improve the language below this part to make it clearer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xushiyan can you elaborate about "on-the-fly inference", i don't think i saw it being mentioned anywhere in the RFC.

I still don't think i fully understand how we're going to be tackling the issue of CDC records being deferred until compactor runs. What if compactor isn't even setup to run for the table?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alexeykudinkin in the last update, i added a table in the Read section "How to infer CDC results", which explains on-the-fly inference. In case of no compaction for the table, CDC queries will always do on-the-fly inference. The Read section "MOR" subsection described about this.


- For Spark
- inserts are written to base files: the CDC data `op=I` will be persisted
- updates/deletes that written to log files are compacted into base files: the CDC data `op=U|D` will be persisted
- For Flink
- inserts/updates/deletes that written to log files are compacted into base files: the CDC data `op=I|U|D` will be
persisted

In summary, we propose CDC data to be persisted synchronously upon base files generation. It is therefore
write-on-indexing for Spark inserts (non-bucket index) and write-on-compaction for everything else.

Note that it may also be necessary to provide capabilities for asynchronously persisting CDC data, in terms of a
separate table service like `ChangeTrackingService`, which can be scheduled to fine-tune the CDC Availability SLA,
effectively decoupling it with Compaction frequency.

#### Examples

Spark DataSource:

```scala
df.write.format("hudi").
options(commonOpts)
option("hoodie.table.cdc.enabled", "true").
option("hoodie.table.cdc.supplemental.logging", "true"). //enable cdc supplemental logging
// option("hoodie.table.cdc.supplemental.logging", "false"). //disable cdc supplemental logging
option("hoodie.table.cdc.supplemental.logging.mode", "DATA_AFTER").
save("/path/to/hudi")
```

Spark SQL example:
Spark SQL:

```sql
-- create a hudi table that enable cdc
Expand All @@ -176,18 +209,18 @@ tblproperties (
'primaryKey' = 'id',
'preCombineField' = 'ts',
'hoodie.table.cdc.enabled' = 'true',
'hoodie.table.cdc.supplemental.logging' = 'true',
'hoodie.table.cdc.supplemental.logging.mode' = 'DATA_AFTER',
'type' = 'cow'
)
```

### Read

This part just discuss how to make Spark (including Spark DataFrame, SQL, Streaming) to read the Hudi CDC data.
This section uses Spark (incl. Spark DataFrame, SQL, Streaming) as an example to perform CDC-format incremental queries.

Implement `CDCReader` that do these steps to response the CDC request:

- judge whether this is a table that has enabled `hoodie.table.cdc.enabled`, and the query range is valid.
- check if `hoodie.table.cdc.enabled=true`, and if the query range is valid.
- extract and filter the commits needed from `ActiveTimeline`.
- For each of commit, get and load the changing files, union and return `DataFrame`.
- We use different ways to extract data according to different file types, details see the description about CDC File Type.
Expand Down Expand Up @@ -215,18 +248,31 @@ Note:

- Only instants that are active can be queried in a CDC scenario.
- `CDCReader` manages all the things on CDC, and all the spark entrances(DataFrame, SQL, Streaming) call the functions in `CDCReader`.
- If `hoodie.table.cdc.supplemental.logging` is false, we need to do more work to get the change data. The following illustration explains the difference when this config is true or false.
- If `hoodie.table.cdc.supplemental.logging.mode=KEY_OP`, we need to compute the changed data. The following illustrates the difference.

![](read_cdc_log_file.jpg)

#### COW table

Reading COW table in CDC query mode is equivalent to reading a simplified MOR table that has no normal log files.
Reading COW tables in CDC query mode is equivalent to reading MOR tables in RO mode.

#### MOR table

According to the design of the writing part, only the cases where writing mor tables will write out the base file (which call the `HoodieMergeHandle` and it's subclasses) will write out the cdc files.
In other words, cdc files will be written out only for the index and file size reasons.
According to the section "Persisting CDC in MOR", CDC data is available upon base files' generation.

When users want to get fresher real-time CDC results:

- users are to set `hoodie.datasource.query.incremental.type=snapshot`
- the implementation logic is to compute the results in-flight by reading log files and the corresponding base files (
current and previous file slices).
- this is equivalent to running incremental-query on MOR RT tables

When users want to optimize compute-cost and are tolerant with latency of CDC results,

- users are to set `hoodie.datasource.query.incremental.type=read_optimized`
- the implementation logic is to extract the results by reading persisted CDC data and the corresponding base files (
current and previous file slices).
- this is equivalent to running incremental-query on MOR RO tables

Here use an illustration to explain how we can query the CDC on MOR table in kinds of cases.

Expand All @@ -238,7 +284,8 @@ Spark DataSource

```scala
spark.read.format("hudi").
option("hoodie.datasource.query.type", "cdc").
option("hoodie.datasource.query.type", "incremental").
option("hoodie.datasource.query.incremental.format", "cdc").
option("hoodie.datasource.read.begin.instanttime", "20220426103000000").
option("hoodie.datasource.read.end.instanttime", "20220426113000000").
load("/path/to/hudi")
Expand All @@ -261,7 +308,7 @@ Spark Streaming

```scala
val df = spark.readStream.format("hudi").
option("hoodie.datasource.query.type", "cdc").
option("hoodie.datasource.query.type", "incremental").
load("/path/to/hudi")

// launch a streaming which starts from the current snapshot of the hudi table,
Expand All @@ -271,11 +318,32 @@ val stream = df.writeStream.format("console").start

# Rollout/Adoption Plan

The CDC feature can be enabled by the corresponding configuration, which is default false. Using this feature dos not depend on Spark versions.
Spark support phase 1

- For COW: support Spark CDC write/read fully
- For MOR: support Spark CDC write (only `OP=I` when write inserts to base files) and CDC read
when `hoodie.datasource.query.incremental.type=snapshot`

Spark support phase 2

- For MOR: Spark CDC write (`OP=U/D` when compact updates/deletes to log files) and CDC read
when `hoodie.datasource.query.incremental.type=read_optimized`
- Note: for CDC write via compaction, `HoodieMergedLogRecordScanner` needs to support producing CDC data for each
version of the changed records. `HoodieCompactor` and `HoodieMergeHandler` are to adapt the changes.

Flink support can be developed in parallel, and can use of the common logical changes of CDC write via compaction in
Spark support phase 2.

# Test Plan

- [ ] Unit tests for this
- [ ] Production end-to-end integration test
- [ ] Benchmark snapshot query for large tables
- Unit tests for this
- Production end-to-end integration test
- Benchmark snapshot query for large tables

# Appendix

## Affected code paths

For `supplemental.logging=DATA_BEFORE` or `DATA_AFTER`

![](code-paths.jpg)