Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
*/
package org.apache.bigtop.manager.stack.bigtop.v3_3_0.hadoop;

import org.apache.bigtop.manager.common.shell.ShellResult;
import org.apache.bigtop.manager.grpc.payload.ComponentCommandPayload;
import org.apache.bigtop.manager.stack.bigtop.param.BigtopParams;
import org.apache.bigtop.manager.stack.core.annotations.GlobalParams;
import org.apache.bigtop.manager.stack.core.spi.param.Params;
import org.apache.bigtop.manager.stack.core.utils.LocalSettings;
import org.apache.bigtop.manager.stack.core.utils.linux.LinuxOSUtils;

import org.apache.commons.lang3.StringUtils;

Expand Down Expand Up @@ -125,6 +127,9 @@ public Map<String, Object> hdfsSite() {
((String) hdfsSite.get("dfs.namenode.https-address")).replace("0.0.0.0", namenodeList.get(0)));
}

// Configure native library dependent settings
configureNativeLibraryDependentSettings(hdfsSite);

dfsDataDir = (String) hdfsSite.get("dfs.datanode.data.dir");
dfsNameNodeDir = (String) hdfsSite.get("dfs.namenode.name.dir");
nameNodeFormattedDirs = Arrays.stream(dfsNameNodeDir.split(","))
Expand Down Expand Up @@ -193,4 +198,266 @@ public String binDir() {
public String getServiceName() {
return "hadoop";
}

/**
* Configure native library dependent settings for HDFS.
* This method intelligently detects libhadoop native library availability
* and automatically configures short-circuit reads and UNIX domain socket settings.
* <p>
* Short-circuit read optimization explanation:
* - When client and DataNode are on the same node, network layer can be bypassed
* to read local data blocks directly
* - Requires glibc version >= 2.34 to ensure native library compatibility
* - Uses UNIX domain sockets for inter-process communication to improve performance
*
* @param hdfsSite The HDFS site configuration map to be modified
*/
private void configureNativeLibraryDependentSettings(Map<String, Object> hdfsSite) {
try {
// Detect system glibc version to determine native library support
boolean enableShortCircuit = isGlibcVersionCompatible();
String domainSocketPath = null;

if (enableShortCircuit) {
log.info("Detected glibc version >= 2.34, enabling short-circuit read optimization");

// Get recommended domain socket path and append port placeholder
domainSocketPath = findOptimalDomainSocketPath();
if (domainSocketPath != null) {
// _PORT placeholder will be replaced with actual port number by DataNode at runtime
domainSocketPath = domainSocketPath + "/dn._PORT";
log.info("Enabling short-circuit reads with domain socket path: {}", domainSocketPath);
}
} else {
log.info("glibc version < 2.34 or detection failed, disabling short-circuit reads for compatibility");
}

// Apply short-circuit read configuration
applyShortCircuitConfiguration(hdfsSite, enableShortCircuit, domainSocketPath);

} catch (Exception e) {
log.error("Error occurred during glibc version detection, disabling short-circuit reads for safety", e);
applyShortCircuitConfiguration(hdfsSite, false, null);
}
}

/**
* Check if glibc version is >= 2.34 to determine native library support.
* <p>
* Detection logic:
* 1. First attempt to use 'ldd --version' command to get glibc version
* 2. If failed, try 'getconf GNU_LIBC_VERSION' as fallback method
* 3. Parse version number and compare with minimum required version (2.34)
*
* @return true if glibc version >= 2.34, false otherwise
*/
private boolean isGlibcVersionCompatible() {
try {
// Method 1: Use ldd command to detect glibc version
ShellResult result = LinuxOSUtils.execCmd("ldd --version");
if (result.getExitCode() == 0) {
String output = result.getOutput();
String[] lines = output.split("\n");
for (String line : lines) {
// Look for lines containing glibc version information
if (line.contains("GNU libc") || line.contains("GLIBC")) {
String version = extractGlibcVersionFromLine(line);
if (version != null) {
boolean supported = compareVersionStrings(version, "2.34") >= 0;
log.info("Detected glibc version via ldd: {}, supported: {}", version, supported);
return supported;
}
}
}
} else {
log.info("ldd --version command failed with exit code: {}", result.getExitCode());
}

// Method 2: Try getconf as fallback detection method
return detectGlibcVersionViaGetconf();

} catch (Exception e) {
log.info("Exception during glibc version detection: {}", e.getMessage());
return detectGlibcVersionViaGetconf();
}
}

/**
* Alternative method using getconf command to detect glibc version.
*
* @return true if detected version >= 2.34, false otherwise
*/
private boolean detectGlibcVersionViaGetconf() {
try {
ShellResult result = LinuxOSUtils.execCmd("getconf GNU_LIBC_VERSION");
if (result.getExitCode() == 0) {
String output = result.getOutput().trim();
if (output.startsWith("glibc ")) {
String version = output.substring(6).trim();
boolean supported = compareVersionStrings(version, "2.34") >= 0;
log.info("Detected glibc version via getconf: {}, supported: {}", version, supported);
return supported;
}
}
} catch (Exception e) {
log.info("getconf method detection failed: {}", e.getMessage());
}

// Default to false for safety
log.warn("Could not determine glibc version, defaulting to disable short-circuit reads");
return false;
}

/**
* Extract glibc version number from ldd output line.
* <p>
* Supported format examples:
* - "ldd (GNU libc) 2.35"
* - "ldd (Ubuntu GLIBC 2.35-0ubuntu3.1) 2.35"
* - "ldd (GNU libc) 2.34"
*
* @param line Single line of text from ldd command output
* @return Extracted version string like "2.35", or null if extraction failed
*/
private String extractGlibcVersionFromLine(String line) {
// Split line by whitespace and look for version pattern
String[] parts = line.split("\\s+");
for (String part : parts) {
// Match version pattern like "2.35"
if (part.matches("\\d+\\.\\d+.*")) {
// Extract major.minor version numbers
String cleanVersion = part.replaceAll("[^\\d.]", "");
// Ensure only major and minor versions are kept
String[] versionParts = cleanVersion.split("\\.");
if (versionParts.length >= 2) {
return versionParts[0] + "." + versionParts[1];
}
return cleanVersion;
}
}
return null;
}

/**
* Compare two version strings (major.minor format).
*
* @param v1 First version string
* @param v2 Second version string
* @return negative if v1 < v2, zero if equal, positive if v1 > v2
*/
private int compareVersionStrings(String v1, String v2) {
String[] parts1 = v1.split("\\.");
String[] parts2 = v2.split("\\.");

int major1 = Integer.parseInt(parts1[0]);
int minor1 = parts1.length > 1 ? Integer.parseInt(parts1[1]) : 0;

int major2 = Integer.parseInt(parts2[0]);
int minor2 = parts2.length > 1 ? Integer.parseInt(parts2[1]) : 0;

// Compare major version first
if (major1 != major2) {
return major1 - major2;
}
// Compare minor version when major versions are equal
return minor1 - minor2;
}

/**
* Find the optimal domain socket path.
* <p>
* Path selection strategy:
* 1. Check candidate paths in priority order for existence and writability
* 2. If none are available, attempt to create default directory
* 3. Finally fall back to /tmp directory
*
* @return Recommended domain socket base path
*/
private String findOptimalDomainSocketPath() {
// Candidate paths in priority order
String[] candidatePaths = {"/var/run/hadoop", "/tmp/hadoop", "/tmp"};

// Check availability of existing paths
for (String path : candidatePaths) {
java.io.File dir = new java.io.File(path);
if (dir.exists() && dir.canWrite()) {
log.info("Selected domain socket path: {}", path);
return path;
}
}

// Try to create default hadoop directory
java.io.File defaultDir = new java.io.File("/tmp/hadoop");
if (!defaultDir.exists()) {
try {
if (defaultDir.mkdirs()) {
log.info("Created and using domain socket path: /tmp/hadoop");
return "/tmp/hadoop";
}
} catch (Exception e) {
log.warn("Cannot create directory /tmp/hadoop, using /tmp as fallback", e);
}
}

log.info("Using fallback domain socket path: /tmp");
return "/tmp";
}

/**
* Apply short-circuit read settings in HDFS site configuration.
* <p>
* Configuration properties explanation:
* - dfs.client.read.shortcircuit: Whether to enable short-circuit reads
* - dfs.domain.socket.path: UNIX domain socket path
* - dfs.client.read.shortcircuit.streams.cache.size: Short-circuit read stream cache size
*
* @param hdfsSite HDFS site configuration map
* @param enableShortCircuit Whether to enable short-circuit reads
* @param domainSocketPath Domain socket path (null to disable domain socket)
*/
private void applyShortCircuitConfiguration(
Map<String, Object> hdfsSite, boolean enableShortCircuit, String domainSocketPath) {

// Configure short-circuit read main switch
hdfsSite.put("dfs.client.read.shortcircuit", String.valueOf(enableShortCircuit));

if (enableShortCircuit && domainSocketPath != null) {
// Enable UNIX domain socket for high-performance short-circuit reads
hdfsSite.put("dfs.domain.socket.path", domainSocketPath);
log.info("Short-circuit reads enabled with domain socket path: {}", domainSocketPath);
} else {
// Remove domain socket path configuration to prevent DataNode startup failures
// This avoids startup errors due to libhadoop loading issues
hdfsSite.remove("dfs.domain.socket.path");
if (enableShortCircuit) {
log.info("Short-circuit reads enabled (fallback mode, without domain socket)");
} else {
log.info("Short-circuit reads disabled");
}
}

// Configure stream cache based on short-circuit read status
configureShortCircuitStreamCache(hdfsSite, enableShortCircuit);
}

/**
* Configure short-circuit read stream cache settings.
*
* @param hdfsSite HDFS site configuration map
* @param enableShortCircuit Whether short-circuit reads are enabled
*/
private void configureShortCircuitStreamCache(Map<String, Object> hdfsSite, boolean enableShortCircuit) {
if (enableShortCircuit) {
// Optimize cache size when short-circuit reads are enabled for better performance
Object currentCacheSize = hdfsSite.get("dfs.client.read.shortcircuit.streams.cache.size");
if (currentCacheSize == null || "0".equals(currentCacheSize.toString())) {
hdfsSite.put("dfs.client.read.shortcircuit.streams.cache.size", "4096");
log.info("Configured short-circuit read stream cache size to 4096");
}
} else {
// Set cache to 0 when short-circuit reads are disabled to save memory
hdfsSite.put("dfs.client.read.shortcircuit.streams.cache.size", "0");
log.info("Short-circuit read stream cache disabled");
}
}
}
Loading