Skip to content

Commit ced81ff

Browse files
author
XuQianJin-Stars
committed
improve spillableMapBasePath disk directory is full
1 parent d36cc05 commit ced81ff

3 files changed

Lines changed: 47 additions & 7 deletions

File tree

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieMemoryConfig.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,10 @@
2222
import org.apache.hudi.common.config.ConfigGroups;
2323
import org.apache.hudi.common.config.ConfigProperty;
2424
import org.apache.hudi.common.config.HoodieConfig;
25+
import org.apache.hudi.common.util.FileIOUtils;
26+
import org.apache.hudi.common.util.Option;
2527

2628
import javax.annotation.concurrent.Immutable;
27-
2829
import java.io.File;
2930
import java.io.FileReader;
3031
import java.io.IOException;
@@ -80,7 +81,11 @@ public class HoodieMemoryConfig extends HoodieConfig {
8081
public static final ConfigProperty<String> SPILLABLE_MAP_BASE_PATH = ConfigProperty
8182
.key("hoodie.memory.spillable.map.path")
8283
.defaultValue("/tmp/")
83-
.withDocumentation("Default file path prefix for spillable map");
84+
.withInferFunction(cfg -> {
85+
String[] localDirs = FileIOUtils.getConfiguredLocalDirs();
86+
return (localDirs != null && localDirs.length > 0) ? Option.of(localDirs[0]) : Option.empty();
87+
})
88+
.withDocumentation("Default file path for spillable map");
8489

8590
public static final ConfigProperty<Double> WRITESTATUS_FAILURE_FRACTION = ConfigProperty
8691
.key("hoodie.memory.writestatus.failure.fraction")

hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818

1919
package org.apache.hudi.common.table.log;
2020

21+
import org.apache.avro.Schema;
22+
import org.apache.hadoop.fs.FileSystem;
23+
import org.apache.hadoop.fs.Path;
2124
import org.apache.hudi.common.config.HoodieCommonConfig;
2225
import org.apache.hudi.common.model.DeleteRecord;
2326
import org.apache.hudi.common.model.HoodieAvroRecord;
@@ -35,12 +38,7 @@
3538
import org.apache.hudi.common.util.SpillableMapUtils;
3639
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
3740
import org.apache.hudi.exception.HoodieIOException;
38-
39-
import org.apache.avro.Schema;
40-
import org.apache.hadoop.fs.FileSystem;
4141
import org.apache.hudi.internal.schema.InternalSchema;
42-
43-
import org.apache.hadoop.fs.Path;
4442
import org.apache.log4j.LogManager;
4543
import org.apache.log4j.Logger;
4644

@@ -97,6 +95,7 @@ protected HoodieMergedLogRecordScanner(FileSystem fs, String basePath, List<Stri
9795
// Store merged records for all versions for this log file, set the in-memory footprint to maxInMemoryMapSize
9896
this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes, spillableMapBasePath, new DefaultSizeEstimator(),
9997
new HoodieRecordSizeEstimator(readerSchema), diskMapType, isBitCaskDiskMapCompressionEnabled);
98+
10099
this.maxMemorySizeInBytes = maxMemorySizeInBytes;
101100
} catch (IOException e) {
102101
throw new HoodieIOException("IOException when creating ExternalSpillableMap at " + spillableMapBasePath, e);

hudi-common/src/main/java/org/apache/hudi/common/util/FileIOUtils.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,4 +204,40 @@ public static Option<byte[]> readDataFromPath(FileSystem fileSystem, org.apache.
204204
public static Option<byte[]> readDataFromPath(FileSystem fileSystem, org.apache.hadoop.fs.Path detailPath) {
205205
return readDataFromPath(fileSystem, detailPath, false);
206206
}
207+
208+
/**
209+
* Return the configured local directories where hudi can write files. This
210+
* method does not create any directories on its own, it only encapsulates the
211+
* logic of locating the local directories according to deployment mode.
212+
*/
213+
public static String[] getConfiguredLocalDirs() {
214+
if (isRunningInYarnContainer()) {
215+
// If we are in yarn mode, systems can have different disk layouts so we must set it
216+
// to what Yarn on this system said was available. Note this assumes that Yarn has
217+
// created the directories already, and that they are secured so that only the
218+
// user has access to them.
219+
return getYarnLocalDirs().split(",");
220+
} else if (System.getProperty("java.io.tmpdir") != null) {
221+
return System.getProperty("java.io.tmpdir").split(",");
222+
} else {
223+
return null;
224+
}
225+
}
226+
227+
public static boolean isRunningInYarnContainer() {
228+
// These environment variables are set by YARN.
229+
return System.getenv("CONTAINER_ID") != null;
230+
}
231+
232+
/**
233+
* Get the Yarn approved local directories.
234+
*/
235+
public static String getYarnLocalDirs() {
236+
String localDirs = Option.of(System.getenv("LOCAL_DIRS")).orElse("");
237+
238+
if (localDirs.isEmpty()) {
239+
throw new HoodieIOException("Yarn Local dirs can't be empty");
240+
}
241+
return localDirs;
242+
}
207243
}

0 commit comments

Comments
 (0)