Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,14 @@ public class HoodieTableConfig extends HoodieConfig {
.sinceVersion("0.11.0")
.withDocumentation("Table checksum is used to guard against partial writes in HDFS. It is added as the last entry in hoodie.properties and then used to validate while reading table config.");

public static final ConfigProperty<Boolean> ALLOW_TEMP_COMMIT = ConfigProperty
.key("hoodie.allow.temp.commit")
.defaultValue(true)
.sinceVersion("0.11.0")
.withDocumentation("Allow to create a temp commit first if there are contents to write, "
+ "will rename it after write is finished, this can avoid read empty or partial commit in the downstreams, "
+ "please be careful if enable this in object storage such as S3, as this will introduce new rename operations");

private static final String TABLE_CHECKSUM_FORMAT = "%s.%s"; // <database_name>.<table_name>

public HoodieTableConfig(FileSystem fs, String metaPath, String payloadClassName) {
Expand Down Expand Up @@ -578,6 +586,13 @@ public String getUrlEncodePartitioning() {
return getString(URL_ENCODE_PARTITIONING);
}

/**
* @returns true is allow temp commit. else returns false.
*/
public boolean allowTempCommit() {
return Boolean.parseBoolean(getStringOrDefault(ALLOW_TEMP_COMMIT));
}

/**
* Read the table checksum.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -664,9 +664,21 @@ private void createFileInMetaPath(String filename, Option<byte[]> content, boole
*/
private void createImmutableFileInPath(Path fullPath, Option<byte[]> content) {
FSDataOutputStream fsout = null;
Path tmpPath = null;
try {
fsout = metaClient.getFs().create(fullPath, false);
if (content.isPresent()) {
if (!content.isPresent()) {
fsout = metaClient.getFs().create(fullPath, false);
}

if (content.isPresent() && metaClient.getTableConfig().allowTempCommit()) {
Path parent = fullPath.getParent();
tmpPath = new Path(parent, fullPath.getName() + ".tmp");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's extract ".tmp" to a constant.

fsout = metaClient.getFs().create(tmpPath, false);
fsout.write(content.get());
}

if (content.isPresent() && !metaClient.getTableConfig().allowTempCommit()) {
fsout = metaClient.getFs().create(fullPath, false);
fsout.write(content.get());
}
} catch (IOException e) {
Expand All @@ -676,6 +688,9 @@ private void createImmutableFileInPath(Path fullPath, Option<byte[]> content) {
if (null != fsout) {
fsout.close();
}
if (null != tmpPath) {
metaClient.getFs().rename(tmpPath, fullPath);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are renames atomic on S3 or other cloud object storage? I know put and delete in s3 is strongly consistent but what about copy?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we cannot rename :) is there a way to do this specific to hdfs?

Copy link
Contributor Author

@boneanxs boneanxs Apr 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can our ConsistencyGuard guarantee this?

Orz,checked ConsistencyGuard cannot guarentee it...

}
} catch (IOException e) {
throw new HoodieIOException("Failed to close file " + fullPath, e);
}
Expand Down