diff --git a/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/hadoop/HadoopParams.java b/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/hadoop/HadoopParams.java index b3110a7a4..2409debc5 100644 --- a/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/hadoop/HadoopParams.java +++ b/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/hadoop/HadoopParams.java @@ -18,11 +18,13 @@ */ package org.apache.bigtop.manager.stack.bigtop.v3_3_0.hadoop; +import org.apache.bigtop.manager.common.shell.ShellResult; import org.apache.bigtop.manager.grpc.payload.ComponentCommandPayload; import org.apache.bigtop.manager.stack.bigtop.param.BigtopParams; import org.apache.bigtop.manager.stack.core.annotations.GlobalParams; import org.apache.bigtop.manager.stack.core.spi.param.Params; import org.apache.bigtop.manager.stack.core.utils.LocalSettings; +import org.apache.bigtop.manager.stack.core.utils.linux.LinuxOSUtils; import org.apache.commons.lang3.StringUtils; @@ -125,6 +127,9 @@ public Map hdfsSite() { ((String) hdfsSite.get("dfs.namenode.https-address")).replace("0.0.0.0", namenodeList.get(0))); } + // Configure native library dependent settings + configureNativeLibraryDependentSettings(hdfsSite); + dfsDataDir = (String) hdfsSite.get("dfs.datanode.data.dir"); dfsNameNodeDir = (String) hdfsSite.get("dfs.namenode.name.dir"); nameNodeFormattedDirs = Arrays.stream(dfsNameNodeDir.split(",")) @@ -193,4 +198,266 @@ public String binDir() { public String getServiceName() { return "hadoop"; } + + /** + * Configure native library dependent settings for HDFS. + * This method intelligently detects libhadoop native library availability + * and automatically configures short-circuit reads and UNIX domain socket settings. + *

+ * Short-circuit read optimization explanation: + * - When client and DataNode are on the same node, network layer can be bypassed + * to read local data blocks directly + * - Requires glibc version >= 2.34 to ensure native library compatibility + * - Uses UNIX domain sockets for inter-process communication to improve performance + * + * @param hdfsSite The HDFS site configuration map to be modified + */ + private void configureNativeLibraryDependentSettings(Map hdfsSite) { + try { + // Detect system glibc version to determine native library support + boolean enableShortCircuit = isGlibcVersionCompatible(); + String domainSocketPath = null; + + if (enableShortCircuit) { + log.info("Detected glibc version >= 2.34, enabling short-circuit read optimization"); + + // Get recommended domain socket path and append port placeholder + domainSocketPath = findOptimalDomainSocketPath(); + if (domainSocketPath != null) { + // _PORT placeholder will be replaced with actual port number by DataNode at runtime + domainSocketPath = domainSocketPath + "/dn._PORT"; + log.info("Enabling short-circuit reads with domain socket path: {}", domainSocketPath); + } + } else { + log.info("glibc version < 2.34 or detection failed, disabling short-circuit reads for compatibility"); + } + + // Apply short-circuit read configuration + applyShortCircuitConfiguration(hdfsSite, enableShortCircuit, domainSocketPath); + + } catch (Exception e) { + log.error("Error occurred during glibc version detection, disabling short-circuit reads for safety", e); + applyShortCircuitConfiguration(hdfsSite, false, null); + } + } + + /** + * Check if glibc version is >= 2.34 to determine native library support. + *

+ * Detection logic: + * 1. First attempt to use 'ldd --version' command to get glibc version + * 2. If failed, try 'getconf GNU_LIBC_VERSION' as fallback method + * 3. Parse version number and compare with minimum required version (2.34) + * + * @return true if glibc version >= 2.34, false otherwise + */ + private boolean isGlibcVersionCompatible() { + try { + // Method 1: Use ldd command to detect glibc version + ShellResult result = LinuxOSUtils.execCmd("ldd --version"); + if (result.getExitCode() == 0) { + String output = result.getOutput(); + String[] lines = output.split("\n"); + for (String line : lines) { + // Look for lines containing glibc version information + if (line.contains("GNU libc") || line.contains("GLIBC")) { + String version = extractGlibcVersionFromLine(line); + if (version != null) { + boolean supported = compareVersionStrings(version, "2.34") >= 0; + log.info("Detected glibc version via ldd: {}, supported: {}", version, supported); + return supported; + } + } + } + } else { + log.info("ldd --version command failed with exit code: {}", result.getExitCode()); + } + + // Method 2: Try getconf as fallback detection method + return detectGlibcVersionViaGetconf(); + + } catch (Exception e) { + log.info("Exception during glibc version detection: {}", e.getMessage()); + return detectGlibcVersionViaGetconf(); + } + } + + /** + * Alternative method using getconf command to detect glibc version. + * + * @return true if detected version >= 2.34, false otherwise + */ + private boolean detectGlibcVersionViaGetconf() { + try { + ShellResult result = LinuxOSUtils.execCmd("getconf GNU_LIBC_VERSION"); + if (result.getExitCode() == 0) { + String output = result.getOutput().trim(); + if (output.startsWith("glibc ")) { + String version = output.substring(6).trim(); + boolean supported = compareVersionStrings(version, "2.34") >= 0; + log.info("Detected glibc version via getconf: {}, supported: {}", version, supported); + return supported; + } + } + } catch (Exception e) { + log.info("getconf method detection failed: {}", e.getMessage()); + } + + // Default to false for safety + log.warn("Could not determine glibc version, defaulting to disable short-circuit reads"); + return false; + } + + /** + * Extract glibc version number from ldd output line. + *

+ * Supported format examples: + * - "ldd (GNU libc) 2.35" + * - "ldd (Ubuntu GLIBC 2.35-0ubuntu3.1) 2.35" + * - "ldd (GNU libc) 2.34" + * + * @param line Single line of text from ldd command output + * @return Extracted version string like "2.35", or null if extraction failed + */ + private String extractGlibcVersionFromLine(String line) { + // Split line by whitespace and look for version pattern + String[] parts = line.split("\\s+"); + for (String part : parts) { + // Match version pattern like "2.35" + if (part.matches("\\d+\\.\\d+.*")) { + // Extract major.minor version numbers + String cleanVersion = part.replaceAll("[^\\d.]", ""); + // Ensure only major and minor versions are kept + String[] versionParts = cleanVersion.split("\\."); + if (versionParts.length >= 2) { + return versionParts[0] + "." + versionParts[1]; + } + return cleanVersion; + } + } + return null; + } + + /** + * Compare two version strings (major.minor format). + * + * @param v1 First version string + * @param v2 Second version string + * @return negative if v1 < v2, zero if equal, positive if v1 > v2 + */ + private int compareVersionStrings(String v1, String v2) { + String[] parts1 = v1.split("\\."); + String[] parts2 = v2.split("\\."); + + int major1 = Integer.parseInt(parts1[0]); + int minor1 = parts1.length > 1 ? Integer.parseInt(parts1[1]) : 0; + + int major2 = Integer.parseInt(parts2[0]); + int minor2 = parts2.length > 1 ? Integer.parseInt(parts2[1]) : 0; + + // Compare major version first + if (major1 != major2) { + return major1 - major2; + } + // Compare minor version when major versions are equal + return minor1 - minor2; + } + + /** + * Find the optimal domain socket path. + *

+ * Path selection strategy: + * 1. Check candidate paths in priority order for existence and writability + * 2. If none are available, attempt to create default directory + * 3. Finally fall back to /tmp directory + * + * @return Recommended domain socket base path + */ + private String findOptimalDomainSocketPath() { + // Candidate paths in priority order + String[] candidatePaths = {"/var/run/hadoop", "/tmp/hadoop", "/tmp"}; + + // Check availability of existing paths + for (String path : candidatePaths) { + java.io.File dir = new java.io.File(path); + if (dir.exists() && dir.canWrite()) { + log.info("Selected domain socket path: {}", path); + return path; + } + } + + // Try to create default hadoop directory + java.io.File defaultDir = new java.io.File("/tmp/hadoop"); + if (!defaultDir.exists()) { + try { + if (defaultDir.mkdirs()) { + log.info("Created and using domain socket path: /tmp/hadoop"); + return "/tmp/hadoop"; + } + } catch (Exception e) { + log.warn("Cannot create directory /tmp/hadoop, using /tmp as fallback", e); + } + } + + log.info("Using fallback domain socket path: /tmp"); + return "/tmp"; + } + + /** + * Apply short-circuit read settings in HDFS site configuration. + *

+ * Configuration properties explanation: + * - dfs.client.read.shortcircuit: Whether to enable short-circuit reads + * - dfs.domain.socket.path: UNIX domain socket path + * - dfs.client.read.shortcircuit.streams.cache.size: Short-circuit read stream cache size + * + * @param hdfsSite HDFS site configuration map + * @param enableShortCircuit Whether to enable short-circuit reads + * @param domainSocketPath Domain socket path (null to disable domain socket) + */ + private void applyShortCircuitConfiguration( + Map hdfsSite, boolean enableShortCircuit, String domainSocketPath) { + + // Configure short-circuit read main switch + hdfsSite.put("dfs.client.read.shortcircuit", String.valueOf(enableShortCircuit)); + + if (enableShortCircuit && domainSocketPath != null) { + // Enable UNIX domain socket for high-performance short-circuit reads + hdfsSite.put("dfs.domain.socket.path", domainSocketPath); + log.info("Short-circuit reads enabled with domain socket path: {}", domainSocketPath); + } else { + // Remove domain socket path configuration to prevent DataNode startup failures + // This avoids startup errors due to libhadoop loading issues + hdfsSite.remove("dfs.domain.socket.path"); + if (enableShortCircuit) { + log.info("Short-circuit reads enabled (fallback mode, without domain socket)"); + } else { + log.info("Short-circuit reads disabled"); + } + } + + // Configure stream cache based on short-circuit read status + configureShortCircuitStreamCache(hdfsSite, enableShortCircuit); + } + + /** + * Configure short-circuit read stream cache settings. + * + * @param hdfsSite HDFS site configuration map + * @param enableShortCircuit Whether short-circuit reads are enabled + */ + private void configureShortCircuitStreamCache(Map hdfsSite, boolean enableShortCircuit) { + if (enableShortCircuit) { + // Optimize cache size when short-circuit reads are enabled for better performance + Object currentCacheSize = hdfsSite.get("dfs.client.read.shortcircuit.streams.cache.size"); + if (currentCacheSize == null || "0".equals(currentCacheSize.toString())) { + hdfsSite.put("dfs.client.read.shortcircuit.streams.cache.size", "4096"); + log.info("Configured short-circuit read stream cache size to 4096"); + } + } else { + // Set cache to 0 when short-circuit reads are disabled to save memory + hdfsSite.put("dfs.client.read.shortcircuit.streams.cache.size", "0"); + log.info("Short-circuit read stream cache disabled"); + } + } }