-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[HUDI-1575] Early Conflict Detection For Multi-writer #6133
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 38 commits
79feeb3
6331439
de69c0e
fcaaf9d
553fb00
dbe3db8
66b7d1b
64819e4
678cce4
5842dcf
645766d
e23ab61
5d0d05f
465536f
c6bc22d
7d8f3bc
fc5927a
3bde14b
ea2719e
844b10a
71e0d1e
374212b
8dfdb4a
6fc5bf1
316e5ae
0b74647
c3403d7
345a9df
ffd8315
6ec57fe
1455ab1
e13ebb9
8a402c4
a3d0a47
3369e5e
b97bb16
1ccecb4
869baf7
0447a71
6fdf901
c973c81
c412635
6bb1974
6d19d03
b90ea04
3f2118a
be0d5b4
aad218a
1b837ec
c34fb52
67b3892
a2980b7
0579f9b
2976167
7344fab
501e47f
46e80ae
0a77616
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,16 +18,20 @@ | |
|
|
||
| package org.apache.hudi.client.transaction.lock; | ||
|
|
||
| import org.apache.hadoop.conf.Configuration; | ||
| import org.apache.hudi.client.transaction.lock.metrics.HoodieLockMetrics; | ||
| import org.apache.hudi.common.config.LockConfiguration; | ||
| import org.apache.hudi.common.config.SerializableConfiguration; | ||
| import org.apache.hudi.common.config.TypedProperties; | ||
| import org.apache.hudi.common.lock.LockProvider; | ||
| import org.apache.hudi.common.util.ReflectionUtils; | ||
| import org.apache.hudi.common.util.StringUtils; | ||
| import org.apache.hudi.config.HoodieLockConfig; | ||
| import org.apache.hudi.config.HoodieWriteConfig; | ||
| import org.apache.hudi.exception.HoodieLockException; | ||
|
|
||
| import org.apache.hadoop.fs.FileSystem; | ||
| import org.apache.hudi.exception.HoodieNotSupportedException; | ||
| import org.apache.log4j.LogManager; | ||
| import org.apache.log4j.Logger; | ||
|
|
||
|
|
@@ -43,23 +47,52 @@ | |
| public class LockManager implements Serializable, AutoCloseable { | ||
|
|
||
| private static final Logger LOG = LogManager.getLogger(LockManager.class); | ||
| private final HoodieWriteConfig writeConfig; | ||
| private final LockConfiguration lockConfiguration; | ||
| private final SerializableConfiguration hadoopConf; | ||
| private final int maxRetries; | ||
| private final long maxWaitTimeInMs; | ||
| private HoodieWriteConfig writeConfig; | ||
| private LockConfiguration lockConfiguration; | ||
| private SerializableConfiguration hadoopConf; | ||
| private int maxRetries; | ||
| private long maxWaitTimeInMs; | ||
yihua marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| private transient HoodieLockMetrics metrics; | ||
| private volatile LockProvider lockProvider; | ||
|
|
||
| public LockManager(HoodieWriteConfig writeConfig, FileSystem fs) { | ||
| init(writeConfig, fs.getConf(), writeConfig.getProps()); | ||
| } | ||
|
|
||
| /** | ||
| * Try to have a lock at partitionPath + fileID level for different write handler. | ||
| * @param writeConfig | ||
| * @param fs | ||
| * @param partitionPath | ||
| * @param fileId | ||
| */ | ||
| public LockManager(HoodieWriteConfig writeConfig, FileSystem fs, String partitionPath, String fileId) { | ||
yihua marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| TypedProperties props = refreshLockConfig(writeConfig, partitionPath + "/" + fileId); | ||
| init(writeConfig, fs.getConf(), props); | ||
| } | ||
|
|
||
| private void init(HoodieWriteConfig writeConfig, Configuration conf, TypedProperties lockProps) { | ||
| this.lockConfiguration = new LockConfiguration(lockProps); | ||
| this.writeConfig = writeConfig; | ||
| this.hadoopConf = new SerializableConfiguration(fs.getConf()); | ||
| this.lockConfiguration = new LockConfiguration(writeConfig.getProps()); | ||
| maxRetries = lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, | ||
| this.hadoopConf = new SerializableConfiguration(conf); | ||
| this.maxRetries = lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, | ||
| Integer.parseInt(HoodieLockConfig.LOCK_ACQUIRE_CLIENT_NUM_RETRIES.defaultValue())); | ||
| maxWaitTimeInMs = lockConfiguration.getConfig().getLong(LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, | ||
| this.maxWaitTimeInMs = lockConfiguration.getConfig().getLong(LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, | ||
| Long.parseLong(HoodieLockConfig.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS.defaultValue())); | ||
| metrics = new HoodieLockMetrics(writeConfig); | ||
| this.metrics = new HoodieLockMetrics(writeConfig); | ||
| } | ||
|
|
||
| /** | ||
| * rebuild lock related configs, only support ZK related lock for now. | ||
| */ | ||
| private TypedProperties refreshLockConfig(HoodieWriteConfig writeConfig, String key) { | ||
yihua marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| TypedProperties props = new TypedProperties(writeConfig.getProps()); | ||
| String zkBasePath = props.getProperty(LockConfiguration.ZK_BASE_PATH_PROP_KEY); | ||
| if (StringUtils.isNullOrEmpty(zkBasePath)) { | ||
| throw new HoodieNotSupportedException("Only Support ZK based lock for now."); | ||
| } | ||
| props.setProperty(LockConfiguration.ZK_LOCK_KEY_PROP_KEY, key); | ||
|
||
| return props; | ||
| } | ||
|
|
||
| public void lock() { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,13 +19,18 @@ | |
| package org.apache.hudi.table.marker; | ||
|
|
||
| import org.apache.hudi.common.config.SerializableConfiguration; | ||
| import org.apache.hudi.common.conflict.detection.HoodieDirectMarkerBasedEarlyConflictDetectionStrategy; | ||
| import org.apache.hudi.common.engine.HoodieEngineContext; | ||
| import org.apache.hudi.common.fs.FSUtils; | ||
| import org.apache.hudi.common.model.IOType; | ||
| import org.apache.hudi.common.table.HoodieTableMetaClient; | ||
| import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; | ||
| import org.apache.hudi.common.table.timeline.HoodieInstant; | ||
| import org.apache.hudi.common.util.HoodieTimer; | ||
| import org.apache.hudi.common.util.MarkerUtils; | ||
| import org.apache.hudi.common.util.Option; | ||
| import org.apache.hudi.common.util.ReflectionUtils; | ||
| import org.apache.hudi.config.HoodieWriteConfig; | ||
| import org.apache.hudi.exception.HoodieException; | ||
| import org.apache.hudi.exception.HoodieIOException; | ||
| import org.apache.hudi.table.HoodieTable; | ||
|
|
@@ -155,6 +160,20 @@ protected Option<Path> create(String partitionPath, String dataFileName, IOType | |
| return create(getMarkerPath(partitionPath, dataFileName, type), checkIfExists); | ||
| } | ||
|
|
||
| @Override | ||
| public Option<Path> createWithEarlyConflictDetection(String partitionPath, String dataFileName, IOType type, boolean checkIfExists, Set<HoodieInstant> completedCommitInstants, | ||
yihua marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| HoodieWriteConfig config, String fileId, HoodieActiveTimeline activeTimeline) { | ||
|
|
||
| long maxAllowableHeartbeatIntervalInMs = config.getHoodieClientHeartbeatIntervalInMs() * config.getHoodieClientHeartbeatTolerableMisses(); | ||
|
|
||
| HoodieDirectMarkerBasedEarlyConflictDetectionStrategy strategy = | ||
| (HoodieDirectMarkerBasedEarlyConflictDetectionStrategy) ReflectionUtils.loadClass(config.getEarlyConflictDetectionStrategyClassName(), | ||
|
||
| basePath, fs, partitionPath, fileId, instantTime, activeTimeline, config, config.earlyConflictDetectionCheckCommitConflict(), maxAllowableHeartbeatIntervalInMs, completedCommitInstants); | ||
|
|
||
| strategy.detectAndResolveConflictIfNecessary(); | ||
| return create(getMarkerPath(partitionPath, dataFileName, type), checkIfExists); | ||
| } | ||
|
|
||
| private Option<Path> create(Path markerPath, boolean checkIfExists) { | ||
| HoodieTimer timer = HoodieTimer.start(); | ||
| Path dirPath = markerPath.getParent(); | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.