-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[HUDI-3634] Could read empty or partial HoodieCommitMetaData in downstream if using HDFS #5048
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 8 commits
b03f1f3
bfedf3e
64cdc45
74f6fcf
63d017a
31c2e0a
3f4bfbd
c0cece1
178249f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,6 +21,7 @@ | |
| import org.apache.hudi.common.metrics.Registry; | ||
| import org.apache.hudi.common.table.HoodieTableMetaClient; | ||
| import org.apache.hudi.common.util.HoodieTimer; | ||
| import org.apache.hudi.common.util.Option; | ||
| import org.apache.hudi.exception.HoodieException; | ||
| import org.apache.hudi.exception.HoodieIOException; | ||
|
|
||
|
|
@@ -60,6 +61,8 @@ | |
| import java.util.concurrent.ConcurrentMap; | ||
| import java.util.concurrent.TimeoutException; | ||
|
|
||
| import static org.apache.hudi.common.fs.StorageSchemes.HDFS; | ||
|
|
||
| /** | ||
| * HoodieWrapperFileSystem wraps the default file system. It holds state about the open streams in the file system to | ||
| * support getting the written size to each of the open streams. | ||
|
|
@@ -68,6 +71,8 @@ public class HoodieWrapperFileSystem extends FileSystem { | |
|
|
||
| public static final String HOODIE_SCHEME_PREFIX = "hoodie-"; | ||
|
|
||
| private static final String TMP_PATH_POSTFIX = ".tmp"; | ||
|
|
||
| protected enum MetricName { | ||
| create, rename, delete, listStatus, mkdirs, getFileStatus, globStatus, listFiles, read, write | ||
| } | ||
|
|
@@ -986,7 +991,63 @@ public long getBytesWritten(Path file) { | |
| file.toString() + " does not have a open stream. Cannot get the bytes written on the stream"); | ||
| } | ||
|
|
||
| protected boolean needCreateTempFile() { | ||
| return HDFS.getScheme().equals(fileSystem.getScheme()); | ||
| } | ||
|
|
||
| /** | ||
| * Creates a new file with overwrite set to false. This ensures files are created | ||
| * only once and never rewritten, also, here we take care if the content is not | ||
| * empty, will first write the content to a temp file if {needCreateTempFile} is | ||
| * true, and then rename it back after the content is written. | ||
| * | ||
| * @param fullPath File Path | ||
| * @param content Content to be stored | ||
| */ | ||
| public void createImmutableFileInPath(Path fullPath, Option<byte[]> content) | ||
| throws HoodieIOException { | ||
| FSDataOutputStream fsout = null; | ||
| Path tmpPath = null; | ||
|
|
||
| boolean needTempFile = needCreateTempFile(); | ||
|
|
||
| try { | ||
| if (!content.isPresent()) { | ||
| fsout = fileSystem.create(fullPath, false); | ||
| } | ||
|
|
||
| if (content.isPresent() && needTempFile) { | ||
| Path parent = fullPath.getParent(); | ||
| tmpPath = new Path(parent, fullPath.getName() + TMP_PATH_POSTFIX); | ||
| fsout = fileSystem.create(tmpPath, false); | ||
| fsout.write(content.get()); | ||
| } | ||
|
|
||
| if (content.isPresent() && !needTempFile) { | ||
| fsout = fileSystem.create(fullPath, false); | ||
| fsout.write(content.get()); | ||
| } | ||
| } catch (IOException e) { | ||
| throw new HoodieIOException("Failed to create file " + fullPath, e); | ||
| } finally { | ||
| try { | ||
| if (null != fsout) { | ||
| fsout.close(); | ||
| } | ||
| if (null != tmpPath) { | ||
| fileSystem.rename(tmpPath, fullPath); | ||
| } | ||
| } catch (IOException e) { | ||
| throw new HoodieIOException("Failed to close file " + fullPath, e); | ||
|
||
| } | ||
| } | ||
| } | ||
|
|
||
| public FileSystem getFileSystem() { | ||
| return fileSystem; | ||
| } | ||
|
|
||
| public ConsistencyGuard getConsistencyGuard() { | ||
| return consistencyGuard; | ||
| } | ||
|
||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.