Skip to content
Binary file added rfc/rfc-56/figure1.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added rfc/rfc-56/figure2.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added rfc/rfc-56/figure3.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added rfc/rfc-56/figure4.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added rfc/rfc-56/flow1.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
238 changes: 238 additions & 0 deletions rfc/rfc-56/rfc-56.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

# RFC-56: Early Conflict Detection For Multi-writer

## Proposers

- @zhangyue19921010

## Approvers

- @yihua

## Status

JIRA: https://issues.apache.org/jira/browse/HUDI-1575

## Abstract

At present, Hudi implements an OCC (Optimistic Concurrency Control) based on timeline to ensure data consistency,
integrity and correctness between multi-writers. OCC detects the conflict at Hudi's file group level, i.e., two
concurrent writers updating the same file group are detected as a conflict. Currently, the conflict detection is
performed before commit metadata and after the data writing is completed. If any conflict is detected, it leads to a
waste of cluster resources because computing and writing were finished already.

To solve this problem, this RFC proposes an early conflict detection mechanism to detect the conflict during the data
writing phase and abort the writing early if conflict is detected, using Hudi's marker mechanism. Before writing each
data file, the writer creates a corresponding marker to mark that the file is created, so that the writer can use the
markers to automatically clean up uncommitted data in failure and rollback scenarios. We propose to use the markers
identify the conflict at the file group level during writing data. There are some subtle differences in early conflict
detection work flow between different types of marker maintainers. For direct markers, hoodie lists necessary marker
files directly and does conflict checking before the writers creating markers and before starting to write corresponding
data file. For the timeline-server based markers, hoodie just gets the result of marker conflict checking before the
writers creating markers and before starting to write corresponding data files. The conflicts are asynchronously and
periodically checked so that the writing conflicts can be detected as early as possible. Both writers may still write
the data files of the same file slice, until the conflict is detected in the next round of checking.

What's more? Hoodie can stop writing earlier because of early conflict detection and release the resources to cluster,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the assumptions about the workload (batch of records being ingested), when the early conflict detection strategy is employed on two are more jobs? In other words, if two jobs A and B concurrently working on the hudi dataset, are they expected to perform "upsert/update" operations only?

Early conflict detection may not be able to catch conflicts in some scenarios:
a) Both jobs A and B are performing an "insert" operation (seeing the record for the first time) and same record key is present on both batches. A might chose fileId F1 and B might choose fileid F2 and no conflict will be flagged. (If all are inserts, with no dupes, there is no need for conflict resolution).
b) Job A received an insert for a record R1 and Job B received an update for record R1. Both A and B might map R1 to different fileIds and conflict won't be detected.
c) If both jobs A and B are not "ingestion" jobs (say A is ingesting to partition P1, B is clustering on partition P1), then early conflict detection strategy will work.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @nbalajee for your attention.

What are the assumptions about the workload (batch of records being ingested), when the early conflict detection strategy is employed on two are more jobs? In other words, if two jobs A and B concurrently working on the hudi dataset, are they expected to perform "upsert/update" operations only?

I had to say, the key point for early conflict detection is to detect if multi-writers modify the same file group or not.

Current occ-based detection can only detect this conflict(which is file group level not record level as you mentioned a), b) and c)) at the end of ingestion.

This new workflow is trying to detected this kind of conflict and fail jobs as early as possible.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nbalajee Thanks for bringing this up. We can add the scope of conflict detection, i.e., at file group level, to the RFC for clarification.

improving resource utilization.

Note that, the early conflict detection proposed by this RFC operates within OCC. Any conflict detection outside the
scope of OCC is not handle. For example, current OCC for multiple writers cannot detect the conflict if two concurrent
writers perform INSERT operations for the same set of record keys, because the writers write to different file groups.
This RFC does not intend to address this problem.

## Background

As we know, transactions and multi-writers of data lakes are becoming the key characteristics of building Lakehouse
these days. Quoting this inspiring blog <strong>Lakehouse Concurrency Control: Are we too optimistic?</strong> directly:
https://hudi.apache.org/blog/2021/12/16/lakehouse-concurrency-control-are-we-too-optimistic/

> "Hudi implements a file level, log based concurrency control protocol on the Hudi timeline, which in-turn relies
> on bare minimum atomic puts to cloud storage. By building on an event log as the central piece for inter process
> coordination, Hudi is able to offer a few flexible deployment models that offer greater concurrency over pure OCC
> approaches that just track table snapshots."

In the multi-writer scenario, Hudi's existing conflict detection occurs after the writer finishing writing the data and
before committing the metadata. In other words, the writer just detects the occurrence of the conflict when it starts to
commit, although all calculations and data writing have been completed, which causes a waste of resources.

For example:

Now there are two writing jobs: job1 writes 10M data to the Hudi table, including updates to file group 1. Another job2
writes 100G to the Hudi table, and also updates the same file group 1.

Job1 finishes and commits to Hudi successfully. After a few hours, job2 finishes writing data files(100G) and starts to
commit metadata. At this time, a conflict with job1 is found, and the job2 has to be aborted and re-run after failure.
Obviously, a lot of computing resources and time are wasted for job2.

Hudi currently has two important mechanisms, marker mechanism and heartbeat mechanism:

1. Marker mechanism can track all the files that are part of an active write.
2. Heartbeat mechanism that can track all active writers to a Hudi table.

Based on marker and heartbeat, this RFC proposes a new conflict detection: Early Conflict Detection. Before the writer
creates the marker and before it starts to write the file, Hudi performs this new conflict detection, trying to detect
the writing conflict directly (for direct markers) or get the async conflict check result (for timeline-server-based
markers) as early as possible and abort the writer when the conflict occurs, so that we can release compute resource as
soon as possible and improve resource utilization.

## Implementation

Here is the high level workflow of early conflict detection as shown in Figure 1 below. The early conflict detection is
guarded by a new feature flag. As we can see, when both `supportsOptimisticConcurrencyControl`
and `isEarlyConflictDetectionEnable` (the new feature flag) are true, we could use this early conflict detection
feature. Else, we skip this check and create marker directly.

![](figure1.png)

The three important steps marked in red in Figure 1 are introduced one by one as follows:

### [1] Check Marker Conflict

As we know, Hudi has two ways to create and maintain markers:

1. DirectWriteMarkers: individual marker file corresponding to each data file is directly created by the writer.
2. TimelineServerBasedWriteMarkers: marker operations are all handled at the timeline service which serves as a proxy

Therefore, for different types of Marker, we must implement the corresponding conflict detection logic based on the
markers. Here we design a new interface `HoodieEarlyConflictDetectionStrategy` to ensure the extensibility of checking
marker conflict.

![](flow1.png)

In this design, we provide `SimpleTransactionDirectMarkerBasedEarlyConflictDetectionStrategy` and
`SimpleDirectMarkerBasedEarlyConflictDetectionStrategy` for DirectWriteMarkers to perform corresponding conflict
detection and conflict resolution. And we provide `AsyncTimelineMarkerEarlyConflictDetectionStrategy` for
TimelineServerBasedWriteMarkers to perform corresponding conflict detection and conflict resolution

#### DirectWriteMarkers related strategy
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might make sense to put a probabilistic data structure (bloom or cuckoo) to frontend the check and speed up checking marker conflicts with all active writers especially for File based markers.

Copy link
Contributor Author

@zhangyue19921010 zhangyue19921010 Oct 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice Idea! If I understand correctly, we can implement another strategy:

  1. When write handler create marker it will create or refresh a bloom index file for current writer.
  2. When early conflict detect works it will check this bloom index file first to speed up this check.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah - I think lets bring this as a default into the design. This should speed up early OCC check when there are no conflicts which is really the case we want to optimize for.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add new strategies named BloomDirectMarkerBasedEarlyConflictDetectionStrategy and BloomAsyncTimelineMarkerEarlyConflictDetectionStrategy which design to read pre-created marker bloom files firstly to pick out potentially conflicting marker files as quick as possible.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that the timeline-server-based markers are the default now, should we rely more on the timeline-server-based early conflict detection for better performance and make the Bloom filter based marker conflict detection as a nice-to-have in the first cut?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the Bloom filter based marker conflict detection, given we're writing additional metadata beyond just the markers, we need to design it properly. That's why I'm thinking we can have it in the second cut.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that the timeline-server-based markers are the default now, should we rely more on the timeline-server-based early conflict detection for better performance and make the Bloom filter based marker conflict detection as a nice-to-have in the first cut?

+1

Because of hoodie using timeline-server-based markers as default. And there only 20(MARKER0~MARKER19) marker files under .temp, so maybe this bloom filters are not a very urgent blocker which need more careful design on both marker creator and marker reader :)


##### SimpleTransactionDirectMarkerBasedEarlyConflictDetectionStrategy

![](figure2.png)

As for current strategy shown in Figure 2, we need to pay attention to the following details:

First, for the same partitionPath/fileId, only one writer can check and create the corresponding marker file at the same
time to make sure no conflict is missed in race conditions. We propose to use the existing transaction mechanism to lock
at the <strong>PartitionPath + "/" + fileID</strong> level before checking conflicts and before creating marker files.
Take ZK locker as an example => `LockConfiguration.ZK_LOCK_KEY_PROP_KEY, partitionPath + "/" + fileId`.

In addition, during conflict detection, we do not need to list all marker files in all directories. Instead, we can
prune the markers to check based on the current partition Path and fileID to avoid unnecessary list operations. For
example (also shown in figure2), we are going to create maker file based on partition path 2022/07/01, and fileID
ff26cb9e-e034-4931-9c59-71bac578f7a0-0. During marker conflict detection, we do not need to list all the partitions
under ".temp". Listing marker files under $BasePath/.hoodie/.temp/instantTime/2022/07/01 and checking if fileID
ff26cb9e-e034-4931-9c59-71bac578f7a0-0 existed or not are sufficient.

##### SimpleDirectMarkerBasedEarlyConflictDetectionStrategy

Compared with `SimpleTransactionDirectMarkerBasedEarlyConflictDetectionStrategy`, this strategy drops the steps of
locking including new transaction, begin transaction and end transaction. The advantages are that the checking speed is
faster, and multiple writers do not affect each other. The downside is that the conflict detection may be delayed till
pre-commit stage.

For example, when two writers detect PartitionPath1 + fileID1 at the same time, both writers pass conflict detection and
successfully create a marker, in which conflict detection is delayed. If so, the conflict can only be found in the
pre-commit conflict detection and fail writing. This leads to waste of resources, but don't compromise the correctness
of the data.

As we can see, all these direct based early conflict detection strategy need extra FS calls.

#### TimelineServerBasedWriteMarkers related strategy

##### AsyncTimelineMarkerEarlyConflictDetectionStrategy

This design expands the create marker API on timeline server.

![](figure3.png)

As shown in Figure 3, the client side calls create marker API, requesting to create markers.

On the timeline server side, there is a MarkerCheckerRunnable thread, which checks the conflicts between the markers in
memory created by the current writer and the markers persisted to the storage created by other active writers
periodically. If any conflict is detected, this thread updates the value of hasConflict, which is used to respond to the
marker creation requests.

We did a performance test for async conflict checker using local FS and two concurrency writers based on 1500+ inflight
markers located in MARKER0~MARKER19. It takes 179ms to finish checking conflict.

Here we need to pay attention to the following details:

Firstly, There may be a concurrency issue between reading and writing the same MARKER file causing EOFException. In
order to simplify the model, we return empty result of markers when catching any exception during reading specific
MARKER and let the subsequent checker to make up for it. For example, writer1 reads writer2's MARKER0 file, but at this
moment writer2 is overwriting MARKER0 file. Finally, we get empty result after writer1 reading writer2's MARKER0. In
extreme cases, the delay of early conflict detection will also occur. Fortunately, we can adjust the frequency of the
schedule according to the actual situation.

Secondly, the marker checker runnable only looks at the instants smaller than the current writer for conflict detection.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not clear what this means to me. For every single write - Do we not have to check with all the existing markers by all the active writers to detect a conflict?.

It makes sense to have some ordering to break ties between concurrent writes trying to create markers - but that is not what we are doing here though if I understand this right.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The design here is to prevent two writers from detecting each other and all failing both.
For example writer1 start at 9:00 and writer2 start at 10:00 which conflict each other.

Without this limit, writer1 detect conflict with writer2 and failed. On the other hand writer2 also detect this conflict and failed either.

With this limit, only writer2 failed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also we can design a priority strategy as follow up :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@prasannarajaperumal I think what @zhangyue19921010 means here is that if two writers have a conflict, one of the writers can succeed and the other fails. This is consistent with the current pre-commit conflict detection, i.e., only the commits in the Hudi timeline snapshot captured at the beginning of the write transaction are going to be checked for conflict. If writer2 starts in the middle of the writer1's transaction, at pre-commit time, writer1 does not see and check the writer2's inflight commit even though it is in the Hudi timeline (because at the beginning of writer1's transaction, the snapshot of the timeline does not have the writer2's commit).

This is to avoid two writers discovering conflicts with each other at the same time and failing together. Such conflict
detection logic based on the instant time is consistent with the existing pre-commit conflict detection.

### [2] Check Commit Conflict: Why we still need to check commit conflict here?

As we know, when a writer completes writing and commits, the corresponding markers are deleted immediately. Other
writers miss the conflict detection if solely based on the markers.

Let's take Figure 4 as an example

![](figure4.png)

Writer1 starts writing data at time t1 and finishes at time t3. Writer2 starts writing at time t2, and updates a file
already updated by writer1 at time t4. Since all markers of writer1 have been deleted at time t4, such conflict cannot
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this what you mean at time t4?
"at time t4 - writer2 tries to create marker for a file already updated by writer1"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeap => at time t4 - writer2 tries to create marker for a file fileA which already updated by writer1 to fileAa(updated)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you update the RFC to word it this way for more clarity? Thanks

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

be found in the stage of marker conflict detection until starting to commit. In order to avoid such delay of early
conflict detection, it is necessary to add the steps of checking commit conflict during the detection.

```
1. Get and cache all instants before init write handler as committed_instants_before.
2. Create and reload a new active timeline after finishing marker conflict checking as committed_instants_after.
3. Get the diff instants between committed_instants_before and committed_instants_after as committed_instants_during.
4. Read all the changed fileIds contained in committed_instants_during as fileId_set_during
5. Check if current writing fileID is overlapped with fileId_set_during
```

### [3] Try to resolve conflict

For now, the default behavior is that the write handlers with conflict throw an `HoodieEarlyConflictDetectionException`
which are running on executors. These tasks then fail and the Hudi write transaction retries.

As for HoodieDeltaStreamer, when we detect marker conflicts, corresponding writing task fails and retries. If the retry
reaches a certain number of times we set, the current stage will fail. At this time, this behavior is consistent with
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should think about how to abort the Hudi write transaction as a whole if one executor fails to create a marker due to early conflict detection, instead of relying on Spark's failure retries.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, Ethan! For now when hudi detected a conflict, it will throw a HoodieEarlyConflictDetectionException on executor ==> current task failed directly

If we want to control this failed ourselves, there is the way i am thinking:

we implement a SparkListener for SparkListenerTaskEnd and register to spark context. If there is any task failed due to HoodieEarlyConflictDetectionException then failed current write transaction

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can do sth like this and take it as a follow-up. Have you measured how much additional time this can save, i.e., how much time do retries take?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is a conflict and users use AsyncTimelineMarkerEarlyConflictDetectionStrategy, it will take about 150 ms to detect this conflict and fail current task.

spark.task.maxFailures with default value 4. It may takes 4* 150 = 600ms to fail current write transaction

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sg. Let's follow up with proper benchmarking on this. We should show how this contributes to the overall write latency of a transaction. Spark retries the whole job/stage which contains multiple tasks. Considering Spark's overhead, it can take some time for all tasks to fail and retry.

the existing OCC based conflict detection.

## Configuration

This RFC adds a feature flag and three new configs to control the behavior of early conflict detection

1. `hoodie.write.lock.early.conflict.detection.enable` default false. Enable early conflict detection based on markers. It will try to detect writing conflict before create markers and fast fail which will release cluster resources as soon as possible.
2. `hoodie.write.lock.early.conflict.async.checker.batch.interval` default 30000L. Used for timeline based marker AsyncTimelineMarkerEarlyConflictDetectionStrategy. The time to delay first async marker conflict checking.
3. `hoodie.write.lock.early.conflict.async.checker.period` default 30000L. Used for timeline based marker AsyncTimelineMarkerEarlyConflictDetectionStrategy. The period between each marker conflict checking.
4. `hoodie.write.lock.early.conflict.detection.strategy` default AsyncTimelineMarkerEarlyConflictDetectionStrategy. Early conflict detection class name, this should be subclass of oorg.apache.hudi.common.model.HoodieEarlyConflictDetectionStrategy


## Rollout/Adoption Plan

- There is a feature flag named `isEarlyConflictDetectionEnable`, and have no impact on existing users.

## Test Plan
Performance test and benchmark between two concurrent writers.
Case 1 enable this early conflict detection.
Case 2 disable this early conflict detection.
Compare the execution time.