diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/JvmUniqueIdProvider.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/JvmUniqueIdProvider.java
new file mode 100644
index 0000000000000..3effbccb830a6
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/JvmUniqueIdProvider.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs;
+
+import java.util.Random;
+
+/**
+ * Provides a JVM-scoped identifier.
+ *
+ *
The identifier is generated once when the class is loaded and remains
+ * constant for the lifetime of the JVM. It is derived using a combination of
+ * the current system time and random entropy to reduce the likelihood of
+ * collisions across JVM instances.
+ *
+ * The identifier is intended for lightweight JVM-level identification,
+ * such as tagging metrics or log entries. It provides best-effort uniqueness
+ * and is not guaranteed to be globally unique.
+ *
+ * This class is utility-only and cannot be instantiated.
+ */
+public final class JvmUniqueIdProvider {
+
+ /** Lower bound (inclusive) for the generated JVM identifier. */
+ private static final int MIN_JVM_ID = 100_000;
+
+ /** Size of the identifier value range. */
+ private static final int JVM_ID_RANGE = 900_000;
+
+ /** Upper bound for random entropy mixed into the identifier. */
+ private static final int RANDOM_ENTROPY_BOUND = 1_000;
+
+ /** JVM-scoped identifier generated at class initialization time. */
+ private static final int JVM_UNIQUE_ID;
+
+ static {
+ long time = System.currentTimeMillis();
+ int random = new Random().nextInt(RANDOM_ENTROPY_BOUND);
+ JVM_UNIQUE_ID = (int) ((time + random) % JVM_ID_RANGE) + MIN_JVM_ID;
+ }
+
+ /** Prevents instantiation. */
+ private JvmUniqueIdProvider() {
+ }
+
+ /**
+ * Returns the JVM-scoped identifier.
+ *
+ * @return an identifier that remains constant for the lifetime of the JVM
+ */
+ public static int getJvmId() {
+ return JVM_UNIQUE_ID;
+ }
+}
+
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/WriteThreadPoolSizeManager.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/WriteThreadPoolSizeManager.java
index 24aecb2d9771d..de11037780b9b 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/WriteThreadPoolSizeManager.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/WriteThreadPoolSizeManager.java
@@ -44,7 +44,6 @@
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.MEDIUM_HEAP_SPACE_FACTOR;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HIGH_CPU_LOW_MEMORY_REDUCTION_FACTOR;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HIGH_CPU_REDUCTION_FACTOR;
-import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HUNDRED_D;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.LOW_CPU_HIGH_MEMORY_DECREASE_FACTOR;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.LOW_CPU_POOL_SIZE_INCREASE_FACTOR;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MEDIUM_CPU_LOW_MEMORY_REDUCTION_FACTOR;
@@ -55,7 +54,6 @@
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_DIRECTION_UP;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.THIRTY_SECONDS;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ZERO;
-import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ZERO_D;
/**
* Manages a thread pool for writing operations, adjusting the pool size based on CPU utilization.
@@ -91,11 +89,11 @@ public final class WriteThreadPoolSizeManager implements Closeable {
/* Tracks the last scale direction applied, or empty if none. */
private volatile String lastScaleDirection = EMPTY_STRING;
/* Maximum CPU utilization observed during the monitoring interval. */
- private volatile double maxJvmCpuUtilization = 0.0;
+ private volatile long maxJvmCpuUtilization = 0L;
/** High memory usage threshold used to trigger thread pool downscaling. */
- private final double highMemoryThreshold;
+ private final long highMemoryThreshold;
/** Low memory usage threshold used to allow thread pool upscaling. */
- private final double lowMemoryThreshold;
+ private final long lowMemoryThreshold;
/**
* Private constructor to initialize the write thread pool and CPU monitor executor
@@ -136,8 +134,8 @@ private WriteThreadPoolSizeManager(String filesystemName,
executor.allowCoreThreadTimeOut(true);
/* Create a scheduled executor for CPU monitoring and pool adjustment */
this.cpuMonitorExecutor = Executors.newScheduledThreadPool(1);
- highMemoryThreshold = abfsConfiguration.getWriteHighMemoryUsageThresholdPercent() / HUNDRED_D;
- lowMemoryThreshold = abfsConfiguration.getWriteLowMemoryUsageThresholdPercent() / HUNDRED_D;
+ highMemoryThreshold = abfsConfiguration.getWriteHighMemoryUsageThresholdPercent();
+ lowMemoryThreshold = abfsConfiguration.getWriteLowMemoryUsageThresholdPercent();
}
/** Returns the internal {@link AbfsConfiguration}. */
@@ -245,7 +243,7 @@ public synchronized void startCPUMonitoring() {
if (!isMonitoringStarted()) {
isMonitoringStarted = true;
cpuMonitorExecutor.scheduleAtFixedRate(() -> {
- double cpuUtilization = ResourceUtilizationUtils.getJvmCpuLoad();
+ long cpuUtilization = ResourceUtilizationUtils.getJvmCpuLoad();
LOG.debug("Current CPU Utilization is this: {}", cpuUtilization);
try {
adjustThreadPoolSizeBasedOnCPU(cpuUtilization);
@@ -266,24 +264,27 @@ public synchronized void startCPUMonitoring() {
* @param cpuUtilization Current system CPU utilization (0.0 to 1.0)
* @throws InterruptedException if the resizing operation is interrupted while acquiring the lock
*/
- public void adjustThreadPoolSizeBasedOnCPU(double cpuUtilization) throws InterruptedException {
+ public void adjustThreadPoolSizeBasedOnCPU(long cpuUtilization) throws InterruptedException {
lock.lock();
try {
ThreadPoolExecutor executor = (ThreadPoolExecutor) this.boundedThreadPool;
int currentPoolSize = executor.getMaximumPoolSize();
- double memoryLoad = ResourceUtilizationUtils.getMemoryLoad();
+ long memoryLoad = ResourceUtilizationUtils.getMemoryLoad();
+ long usedHeapMemory = ResourceUtilizationUtils.getUsedHeapMemory();
+ long availableMemory = ResourceUtilizationUtils.getAvailableHeapMemory();
+ long committedMemory = ResourceUtilizationUtils.getCommittedHeapMemory();
LOG.debug("The memory load is {} and CPU utilization is {}", memoryLoad, cpuUtilization);
- if (cpuUtilization > (abfsConfiguration.getWriteHighCpuThreshold()/HUNDRED_D)) {
+ if (cpuUtilization > (abfsConfiguration.getWriteHighCpuThreshold())) {
newMaxPoolSize = calculateReducedPoolSizeHighCPU(currentPoolSize, memoryLoad);
if (currentPoolSize == initialPoolSize && newMaxPoolSize == initialPoolSize) {
lastScaleDirection = SCALE_DIRECTION_NO_DOWN_AT_MIN;
}
- } else if (cpuUtilization > (abfsConfiguration.getWriteMediumCpuThreshold()/HUNDRED_D)) {
+ } else if (cpuUtilization > (abfsConfiguration.getWriteMediumCpuThreshold())) {
newMaxPoolSize = calculateReducedPoolSizeMediumCPU(currentPoolSize, memoryLoad);
if (currentPoolSize == initialPoolSize && newMaxPoolSize == initialPoolSize) {
lastScaleDirection = SCALE_DIRECTION_NO_DOWN_AT_MIN;
}
- } else if (cpuUtilization < (abfsConfiguration.getWriteLowCpuThreshold()/HUNDRED_D)) {
+ } else if (cpuUtilization < (abfsConfiguration.getWriteLowCpuThreshold())) {
newMaxPoolSize = calculateIncreasedPoolSizeLowCPU(currentPoolSize, memoryLoad);
if (currentPoolSize == maxThreadPoolSize && newMaxPoolSize == maxThreadPoolSize) {
lastScaleDirection = SCALE_DIRECTION_NO_UP_AT_MAX;
@@ -294,7 +295,8 @@ public void adjustThreadPoolSizeBasedOnCPU(double cpuUtilization) throws Interru
}
boolean willResize = newMaxPoolSize != currentPoolSize;
if (!willResize && !lastScaleDirection.equals(EMPTY_STRING)) {
- WriteThreadPoolStats stats = getCurrentStats(cpuUtilization, memoryLoad);
+ WriteThreadPoolStats stats = getCurrentStats(cpuUtilization, memoryLoad,
+ usedHeapMemory, availableMemory, committedMemory);
// Update the write thread pool metrics with the latest statistics snapshot.
writeThreadPoolMetrics.update(stats);
}
@@ -304,7 +306,8 @@ public void adjustThreadPoolSizeBasedOnCPU(double cpuUtilization) throws Interru
if (!willResize) {
try {
// Capture the latest thread pool statistics (pool size, CPU, memory, etc.).
- WriteThreadPoolStats stats = getCurrentStats(cpuUtilization, memoryLoad);
+ WriteThreadPoolStats stats = getCurrentStats(cpuUtilization, memoryLoad,
+ usedHeapMemory, availableMemory, committedMemory);
// Update the write thread pool metrics with the latest statistics snapshot.
writeThreadPoolMetrics.update(stats);
} catch (Exception e) {
@@ -320,7 +323,8 @@ public void adjustThreadPoolSizeBasedOnCPU(double cpuUtilization) throws Interru
adjustThreadPoolSize(newMaxPoolSize);
try {
// Capture the latest thread pool statistics (pool size, CPU, memory, etc.).
- WriteThreadPoolStats stats = getCurrentStats(cpuUtilization, memoryLoad);
+ WriteThreadPoolStats stats = getCurrentStats(cpuUtilization, memoryLoad,
+ usedHeapMemory, availableMemory, committedMemory);
// Update the write thread pool metrics with the latest statistics snapshot.
writeThreadPoolMetrics.update(stats);
} catch (Exception e) {
@@ -437,7 +441,7 @@ public synchronized boolean isMonitoringStarted() {
* @return the highest JVM CPU utilization percentage recorded
*/
@VisibleForTesting
- public double getMaxJvmCpuUtilization() {
+ public long getMaxJvmCpuUtilization() {
return maxJvmCpuUtilization;
}
@@ -500,9 +504,9 @@ public static class WriteThreadPoolStats extends ResourceUtilizationStats {
*/
public WriteThreadPoolStats(int currentPoolSize,
int maxPoolSize, int activeThreads, int idleThreads,
- double jvmCpuLoad, double systemCpuUtilization, double availableHeapGB,
- double committedHeapGB, double usedHeapGB, double maxHeapGB, double memoryLoad, String lastScaleDirection,
- double maxCpuUtilization, long jvmProcessId) {
+ long jvmCpuLoad, long systemCpuUtilization, long availableHeapGB,
+ long committedHeapGB, long usedHeapGB, long maxHeapGB, long memoryLoad, String lastScaleDirection,
+ long maxCpuUtilization, long jvmProcessId) {
super(currentPoolSize, maxPoolSize, activeThreads, idleThreads,
jvmCpuLoad, systemCpuUtilization, availableHeapGB,
committedHeapGB, usedHeapGB, maxHeapGB, memoryLoad, lastScaleDirection,
@@ -511,22 +515,28 @@ public WriteThreadPoolStats(int currentPoolSize,
}
/**
- * Returns the latest statistics for the write thread pool and system resources.
- * The snapshot includes thread counts, JVM and system CPU utilization, and the
- * current heap usage. These metrics are used for monitoring and making dynamic
- * sizing decisions for the write thread pool.
+ * Returns a snapshot of the current write thread pool and JVM/system resource
+ * statistics.
*
- * @param jvmCpuUtilization current JVM CPU usage (%)
- * @param memoryLoad current JVM memory load (used/committed)
- * @return a {@link WriteThreadPoolStats} object containing the current metrics
+ * The snapshot includes thread pool size and activity, JVM and system CPU
+ * utilization, and JVM heap memory metrics. These values are used for monitoring
+ * and for making dynamic scaling decisions for the write thread pool.
+ *
+ * @param jvmCpuUtilization current JVM CPU utilization
+ * @param memoryLoad current JVM memory load ratio (used / max)
+ * @param usedMemory current used JVM heap memory
+ * @param availableMemory current available JVM heap memory
+ * @param committedMemory current committed JVM heap memory
+ *
+ * @return a {@link WriteThreadPoolStats} instance containing the current metrics
*/
- synchronized WriteThreadPoolStats getCurrentStats(double jvmCpuUtilization,
- double memoryLoad) {
+ synchronized WriteThreadPoolStats getCurrentStats(long jvmCpuUtilization,
+ long memoryLoad, long usedMemory, long availableMemory, long committedMemory) {
if (boundedThreadPool == null) {
return new WriteThreadPoolStats(
- ZERO, ZERO, ZERO, ZERO, ZERO_D, ZERO_D, ZERO_D, ZERO_D, ZERO_D,
- ZERO_D, ZERO_D, EMPTY_STRING, ZERO_D, ZERO);
+ ZERO, ZERO, ZERO, ZERO, ZERO, ZERO, ZERO, ZERO, ZERO,
+ ZERO, ZERO, EMPTY_STRING, ZERO, ZERO);
}
ThreadPoolExecutor exec = (ThreadPoolExecutor) this.boundedThreadPool;
@@ -544,15 +554,15 @@ synchronized WriteThreadPoolStats getCurrentStats(double jvmCpuUtilization,
activeThreads, // Busy threads
idleThreads, // Idle threads
jvmCpuUtilization, // JVM CPU usage (ratio)
- ResourceUtilizationUtils.getSystemCpuLoad(), // System CPU usage (ratio)
- ResourceUtilizationUtils.getAvailableHeapMemory(), // Free heap (GB)
- ResourceUtilizationUtils.getCommittedHeapMemory(), // Committed heap (GB)
- ResourceUtilizationUtils.getUsedHeapMemory(), // Used heap (GB)
+ ResourceUtilizationUtils.getSystemCpuLoad(), // System CPU usage (ratio)
+ availableMemory, // Free heap (GB)
+ committedMemory, // Committed heap (GB)
+ usedMemory, // Used heap (GB)
ResourceUtilizationUtils.getMaxHeapMemory(), // Max heap (GB)
memoryLoad, // used/max
currentScaleDirection, // "I", "D", or ""
getMaxJvmCpuUtilization(), // Peak JVM CPU usage so far
- ResourceUtilizationUtils.getJvmProcessId() // JVM PID
+ JvmUniqueIdProvider.getJvmId() // JVM PID
);
}
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java
index 43171aef07316..52fbd3182fdd5 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java
@@ -1338,7 +1338,11 @@ public AbfsRestOperation read(final String path,
AbfsReadResourceUtilizationMetrics readResourceUtilizationMetrics = retrieveReadResourceUtilizationMetrics();
// If metrics are available, record them in the tracing context for diagnostics or logging.
if (readResourceUtilizationMetrics != null) {
- tracingContext.setResourceUtilizationMetricResults(readResourceUtilizationMetrics.toString());
+ String readMetrics = readResourceUtilizationMetrics.toString();
+ tracingContext.setResourceUtilizationMetricResults(readMetrics);
+ if (!readMetrics.isEmpty()) {
+ readResourceUtilizationMetrics.markPushed();
+ }
}
URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = getAbfsRestOperation(
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java
index cf2449ea91834..5ddb9770ac56e 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java
@@ -1065,7 +1065,11 @@ public AbfsRestOperation read(final String path,
AbfsReadResourceUtilizationMetrics readResourceUtilizationMetrics = retrieveReadResourceUtilizationMetrics();
// If metrics are available, record them in the tracing context for diagnostics or logging.
if (readResourceUtilizationMetrics != null) {
- tracingContext.setResourceUtilizationMetricResults(readResourceUtilizationMetrics.toString());
+ String readMetrics = readResourceUtilizationMetrics.toString();
+ tracingContext.setResourceUtilizationMetricResults(readMetrics);
+ if (!readMetrics.isEmpty()) {
+ readResourceUtilizationMetrics.markPushed();
+ }
}
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = getAbfsRestOperation(
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsReadResourceUtilizationMetrics.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsReadResourceUtilizationMetrics.java
index a38dd08c5575e..82fef4f4b62fa 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsReadResourceUtilizationMetrics.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsReadResourceUtilizationMetrics.java
@@ -19,7 +19,7 @@
package org.apache.hadoop.fs.azurebfs.services;
-import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HUNDRED_D;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.fs.azurebfs.enums.AbfsReadResourceUtilizationMetricsEnum;
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
@@ -36,6 +36,36 @@ public class AbfsReadResourceUtilizationMetrics
extends
AbstractAbfsResourceUtilizationMetrics {
+ /**
+ * A version counter incremented each time a metric update occurs.
+ * Used to detect whether metrics have changed since the last serialization.
+ */
+ private final AtomicLong updateVersion = new AtomicLong(0);
+
+ /**
+ * The last version number that was serialized and pushed out.
+ */
+ private final AtomicLong lastPushedVersion = new AtomicLong(0);
+
+ @Override
+ protected boolean isUpdated() {
+ return updateVersion.get() > lastPushedVersion.get();
+ }
+
+ protected synchronized void markUpdated() {
+ updateVersion.incrementAndGet();
+ }
+
+ @Override
+ protected long getUpdateVersion() {
+ return updateVersion.get();
+ }
+
+ @Override
+ protected long getLastPushedVersion() {
+ return lastPushedVersion.get();
+ }
+
/**
* Creates a metrics set for read operations, initializing all
* metric keys defined in {@link AbfsReadResourceUtilizationMetricsEnum}.
@@ -44,6 +74,15 @@ public AbfsReadResourceUtilizationMetrics() {
super(AbfsReadResourceUtilizationMetricsEnum.values(), FSOperationType.READ.toString());
}
+ /**
+ * Marks the current metrics version as pushed.
+ * Must be called only after the metrics string is actually emitted.
+ */
+ @Override
+ public synchronized void markPushed() {
+ lastPushedVersion.set(updateVersion.get());
+ }
+
/**
* Updates all read-thread-pool metrics using the latest stats snapshot.
*
@@ -71,16 +110,16 @@ public synchronized void update(ReadBufferManagerV2.ReadThreadPoolStats stats) {
setMetricValue(AbfsReadResourceUtilizationMetricsEnum.MAX_POOL_SIZE, stats.getMaxPoolSize());
setMetricValue(AbfsReadResourceUtilizationMetricsEnum.ACTIVE_THREADS, stats.getActiveThreads());
setMetricValue(AbfsReadResourceUtilizationMetricsEnum.IDLE_THREADS, stats.getIdleThreads());
- setMetricValue(AbfsReadResourceUtilizationMetricsEnum.JVM_CPU_UTILIZATION, stats.getJvmCpuLoad() * HUNDRED_D);
- setMetricValue(AbfsReadResourceUtilizationMetricsEnum.SYSTEM_CPU_UTILIZATION, stats.getSystemCpuUtilization() * HUNDRED_D);
+ setMetricValue(AbfsReadResourceUtilizationMetricsEnum.JVM_CPU_UTILIZATION, stats.getJvmCpuLoad());
+ setMetricValue(AbfsReadResourceUtilizationMetricsEnum.SYSTEM_CPU_UTILIZATION, stats.getSystemCpuUtilization());
setMetricValue(AbfsReadResourceUtilizationMetricsEnum.AVAILABLE_MEMORY, stats.getMemoryUtilization());
setMetricValue(AbfsReadResourceUtilizationMetricsEnum.COMMITTED_MEMORY, stats.getCommittedHeapGB());
setMetricValue(AbfsReadResourceUtilizationMetricsEnum.USED_MEMORY, stats.getUsedHeapGB());
setMetricValue(AbfsReadResourceUtilizationMetricsEnum.MAX_HEAP_MEMORY, stats.getMaxHeapGB());
- setMetricValue(AbfsReadResourceUtilizationMetricsEnum.MEMORY_LOAD, stats.getMemoryLoad() * HUNDRED_D);
+ setMetricValue(AbfsReadResourceUtilizationMetricsEnum.MEMORY_LOAD, stats.getMemoryLoad());
setMetricValue(AbfsReadResourceUtilizationMetricsEnum.LAST_SCALE_DIRECTION,
stats.getLastScaleDirectionNumeric(stats.getLastScaleDirection()));
- setMetricValue(AbfsReadResourceUtilizationMetricsEnum.MAX_CPU_UTILIZATION, stats.getMaxCpuUtilization() * HUNDRED_D);
+ setMetricValue(AbfsReadResourceUtilizationMetricsEnum.MAX_CPU_UTILIZATION, stats.getMaxCpuUtilization());
setMetricValue(AbfsReadResourceUtilizationMetricsEnum.JVM_PROCESS_ID, stats.getJvmProcessId());
markUpdated();
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsWriteResourceUtilizationMetrics.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsWriteResourceUtilizationMetrics.java
index bace4421d30ad..6c44c20b31b9d 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsWriteResourceUtilizationMetrics.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsWriteResourceUtilizationMetrics.java
@@ -18,7 +18,7 @@
package org.apache.hadoop.fs.azurebfs.services;
-import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HUNDRED_D;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.fs.azurebfs.enums.AbfsWriteResourceUtilizationMetricsEnum;
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
@@ -36,6 +36,17 @@ public class AbfsWriteResourceUtilizationMetrics
extends
AbstractAbfsResourceUtilizationMetrics {
+ /**
+ * A version counter incremented each time a metric update occurs.
+ * Used to detect whether metrics have changed since the last serialization.
+ */
+ private final AtomicLong updateVersion = new AtomicLong(0);
+
+ /**
+ * The last version number that was serialized and pushed out.
+ */
+ private final AtomicLong lastPushedVersion = new AtomicLong(0);
+
/**
* Creates a metrics set for write operations, pre-initializing
* all metric keys defined in {@link AbfsWriteResourceUtilizationMetricsEnum}.
@@ -44,6 +55,34 @@ public AbfsWriteResourceUtilizationMetrics() {
super(AbfsWriteResourceUtilizationMetricsEnum.values(), FSOperationType.WRITE.toString());
}
+ @Override
+ protected boolean isUpdated() {
+ return updateVersion.get() > lastPushedVersion.get();
+ }
+
+ protected void markUpdated() {
+ updateVersion.incrementAndGet();
+ }
+
+ @Override
+ protected long getUpdateVersion() {
+ return updateVersion.get();
+ }
+
+ @Override
+ protected long getLastPushedVersion() {
+ return lastPushedVersion.get();
+ }
+
+ /**
+ * Marks the current metrics version as pushed.
+ * Must be called only after the metrics string is actually emitted.
+ */
+ @Override
+ public synchronized void markPushed() {
+ lastPushedVersion.set(updateVersion.get());
+ }
+
/**
* Updates all write-thread-pool metrics using the latest stats snapshot.
* Each field in {@link WriteThreadPoolSizeManager.WriteThreadPoolStats}
@@ -60,19 +99,18 @@ public synchronized void update(WriteThreadPoolSizeManager.WriteThreadPoolStats
setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.MAX_POOL_SIZE, stats.getMaxPoolSize());
setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.ACTIVE_THREADS, stats.getActiveThreads());
setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.IDLE_THREADS, stats.getIdleThreads());
- setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.JVM_CPU_UTILIZATION, stats.getJvmCpuLoad() * HUNDRED_D);
- setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.SYSTEM_CPU_UTILIZATION, stats.getSystemCpuUtilization() * HUNDRED_D);
+ setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.JVM_CPU_UTILIZATION, stats.getJvmCpuLoad());
+ setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.SYSTEM_CPU_UTILIZATION, stats.getSystemCpuUtilization());
setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.AVAILABLE_MEMORY, stats.getMemoryUtilization());
setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.COMMITTED_MEMORY, stats.getCommittedHeapGB());
setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.USED_MEMORY, stats.getUsedHeapGB());
setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.MAX_HEAP_MEMORY, stats.getMaxHeapGB());
- setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.MEMORY_LOAD, stats.getMemoryLoad() * HUNDRED_D);
+ setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.MEMORY_LOAD, stats.getMemoryLoad());
setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.LAST_SCALE_DIRECTION,
stats.getLastScaleDirectionNumeric(stats.getLastScaleDirection()));
- setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.MAX_CPU_UTILIZATION, stats.getMaxCpuUtilization() * HUNDRED_D);
+ setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.MAX_CPU_UTILIZATION, stats.getMaxCpuUtilization());
setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.JVM_PROCESS_ID, stats.getJvmProcessId());
markUpdated();
}
}
-
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbstractAbfsResourceUtilizationMetrics.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbstractAbfsResourceUtilizationMetrics.java
index 2e5a172e62455..4a080d07214ad 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbstractAbfsResourceUtilizationMetrics.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbstractAbfsResourceUtilizationMetrics.java
@@ -19,19 +19,19 @@
package org.apache.hadoop.fs.azurebfs.services;
import java.util.Arrays;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hadoop.fs.azurebfs.enums.AbfsReadResourceUtilizationMetricsEnum;
import org.apache.hadoop.fs.azurebfs.enums.StatisticTypeEnum;
import org.apache.hadoop.fs.azurebfs.enums.AbfsResourceUtilizationMetricsEnum;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_EQUALS;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HUNDRED_D;
import static org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.CHAR_DOLLAR;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.iostatisticsStore;
@@ -47,21 +47,10 @@ public abstract class AbstractAbfsResourceUtilizationMetrics &
private static final Logger LOG = LoggerFactory.getLogger(
AbstractAbfsResourceUtilizationMetrics.class);
- /**
- * Tracks whether any metric has been updated at least once.
- */
- private final AtomicBoolean updatedAtLeastOnce = new AtomicBoolean(false);
-
- /**
- * A version counter incremented each time a metric update occurs.
- * Used to detect whether metrics have changed since the last serialization.
- */
- private final AtomicLong updateVersion = new AtomicLong(0);
-
- /**
- * The last version number that was serialized and pushed out.
- */
- private final AtomicLong lastPushedVersion = new AtomicLong(-1);
+ protected abstract boolean isUpdated();
+ protected abstract long getUpdateVersion();
+ protected abstract long getLastPushedVersion();
+ public abstract void markPushed();
/**
* The set of metrics supported by this metrics instance.
@@ -128,56 +117,65 @@ protected void setMetricValue(T metric, double value) {
}
}
- /**
- * Marks that a metric update has occurred.
- * Increments the version so consumers know that new data is available.
- */
- protected void markUpdated() {
- updatedAtLeastOnce.set(true);
- updateVersion.incrementAndGet();
- }
-
- /**
- * Returns a flag indicating whether any metric has been updated since initialization.
- *
- * @return the {@link AtomicBoolean} tracking whether at least one update occurred
- */
- public boolean getUpdatedAtLeastOnce() {
- return updatedAtLeastOnce.get();
- }
-
- /**
- * Serializes the current metrics to a compact string format suitable for logs.
- * @return a serialized metrics string or an empty string if no updates occurred
- */
@Override
public String toString() {
- if (!updatedAtLeastOnce.get()) {
+ if (!isUpdated()) {
return EMPTY_STRING;
}
- long currentVersion = updateVersion.get();
- if (currentVersion == lastPushedVersion.get()) {
+ long currentVersion = getUpdateVersion();
+ if (currentVersion == getLastPushedVersion()) {
return EMPTY_STRING;
}
- synchronized (this) {
- if (currentVersion == lastPushedVersion.get()) {
- return EMPTY_STRING;
- }
+ StringBuilder sb = new StringBuilder(operationType).append(CHAR_EQUALS);
- StringBuilder sb = new StringBuilder(operationType).append(CHAR_EQUALS);
+ for (T metric : metrics) {
+ long value = lookupGaugeValue(metric.getName());
- for (T metric : metrics) {
+ if (isMemoryMetric(metric.getName())) {
sb.append(metric.getName())
.append(CHAR_EQUALS)
- .append(lookupGaugeValue(metric.getName()))
+ .append(convertMemoryValue(value))
+ .append(CHAR_DOLLAR);
+ } else {
+ sb.append(metric.getName())
+ .append(CHAR_EQUALS)
+ .append(value)
.append(CHAR_DOLLAR);
}
-
- lastPushedVersion.set(currentVersion);
- return sb.toString();
}
+
+ return sb.toString();
+ }
+
+ /**
+ * Determines whether the given metric represents a JVM heap memory metric.
+ *
+ * Memory metrics are identified by their names as defined in
+ * {@link AbfsReadResourceUtilizationMetricsEnum}.
+ *
+ * @param metricName the metric name
+ * @return {@code true} if the metric is a memory metric, {@code false} otherwise
+ */
+ private boolean isMemoryMetric(String metricName) {
+ return metricName.equals(
+ AbfsReadResourceUtilizationMetricsEnum.AVAILABLE_MEMORY.getName())
+ || metricName.equals(
+ AbfsReadResourceUtilizationMetricsEnum.COMMITTED_MEMORY.getName())
+ || metricName.equals(
+ AbfsReadResourceUtilizationMetricsEnum.USED_MEMORY.getName())
+ || metricName.equals(
+ AbfsReadResourceUtilizationMetricsEnum.MAX_HEAP_MEMORY.getName());
}
-}
+ /**
+ * Converts a raw memory metric value to a decimal representation for logging.
+ *
+ * @param value the raw memory value
+ * @return the converted memory value
+ */
+ private double convertMemoryValue(long value) {
+ return value / HUNDRED_D;
+ }
+}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobIngressHandler.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobIngressHandler.java
index 98ef0bc1a1985..6877d5a03a913 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobIngressHandler.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobIngressHandler.java
@@ -128,7 +128,11 @@ protected AbfsRestOperation remoteWrite(AbfsBlock blockToUpload,
// Fetches write thread pool metrics from the ABFS client and adds them to the tracing context.
AbfsWriteResourceUtilizationMetrics writeResourceUtilizationMetrics = getWriteResourceUtilizationMetrics();
if (writeResourceUtilizationMetrics != null) {
- tracingContextAppend.setResourceUtilizationMetricResults(writeResourceUtilizationMetrics.toString());
+ String writeMetrics = writeResourceUtilizationMetrics.toString();
+ tracingContextAppend.setResourceUtilizationMetricResults(writeMetrics);
+ if (!writeMetrics.isEmpty()) {
+ writeResourceUtilizationMetrics.markPushed();
+ }
}
try {
LOG.trace("Starting remote write for block with ID {} and offset {}",
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDFSIngressHandler.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDFSIngressHandler.java
index ccf50808eb580..bc5e6b7c521eb 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDFSIngressHandler.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDFSIngressHandler.java
@@ -120,7 +120,11 @@ protected AbfsRestOperation remoteWrite(AbfsBlock blockToUpload,
// Fetches write thread pool metrics from the ABFS client and adds them to the tracing context.
AbfsWriteResourceUtilizationMetrics writeResourceUtilizationMetrics = getWriteResourceUtilizationMetrics();
if (writeResourceUtilizationMetrics != null) {
- tracingContextAppend.setResourceUtilizationMetricResults(writeResourceUtilizationMetrics.toString());
+ String writeMetrics = writeResourceUtilizationMetrics.toString();
+ tracingContextAppend.setResourceUtilizationMetricResults(writeMetrics);
+ if (!writeMetrics.isEmpty()) {
+ writeResourceUtilizationMetrics.markPushed();
+ }
}
String threadIdStr = String.valueOf(Thread.currentThread().getId());
if (tracingContextAppend.getIngressHandler().equals(EMPTY_STRING)) {
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDfsToBlobIngressFallbackHandler.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDfsToBlobIngressFallbackHandler.java
index 4664f1fb26ea8..8db94cc3e4cd2 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDfsToBlobIngressFallbackHandler.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDfsToBlobIngressFallbackHandler.java
@@ -113,7 +113,11 @@ protected AbfsRestOperation remoteWrite(AbfsBlock blockToUpload,
// Fetches write thread pool metrics from the ABFS client and adds them to the tracing context.
AbfsWriteResourceUtilizationMetrics writeResourceUtilizationMetrics = getWriteResourceUtilizationMetrics();
if (writeResourceUtilizationMetrics != null) {
- tracingContextAppend.setResourceUtilizationMetricResults(writeResourceUtilizationMetrics.toString());
+ String writeMetrics = writeResourceUtilizationMetrics.toString();
+ tracingContextAppend.setResourceUtilizationMetricResults(writeMetrics);
+ if (!writeMetrics.isEmpty()) {
+ writeResourceUtilizationMetrics.markPushed();
+ }
}
String threadIdStr = String.valueOf(Thread.currentThread().getId());
tracingContextAppend.setIngressHandler(FALLBACK_APPEND + " T " + threadIdStr);
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java
index d64c7246c5b33..071c1b1684955 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.fs.azurebfs.services;
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.hadoop.fs.azurebfs.JvmUniqueIdProvider;
import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
import java.io.IOException;
@@ -51,7 +52,6 @@
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_DIRECTION_NO_UP_AT_MAX;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_DIRECTION_UP;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ZERO;
-import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ZERO_D;
/**
* The Improved Read Buffer Manager for Rest AbfsClient.
@@ -68,7 +68,7 @@ public final class ReadBufferManagerV2 extends ReadBufferManager {
private static int cpuMonitoringIntervalInMilliSec;
- private static double cpuThreshold;
+ private static long cpuThreshold;
private static int threadPoolUpscalePercentage;
@@ -94,7 +94,7 @@ public final class ReadBufferManagerV2 extends ReadBufferManager {
private static int memoryMonitoringIntervalInMilliSec;
- private static double memoryThreshold;
+ private static long memoryThreshold;
private final AtomicInteger numberOfActiveBuffers = new AtomicInteger(0);
@@ -116,7 +116,7 @@ public final class ReadBufferManagerV2 extends ReadBufferManager {
/* Tracks the last scale direction applied, or empty if none. */
private volatile String lastScaleDirection = EMPTY_STRING;
/* Maximum CPU utilization observed during the monitoring interval. */
- private volatile double maxJvmCpuUtilization = 0.0;
+ private volatile long maxJvmCpuUtilization = 0L;
/**
* Private constructor to prevent instantiation as this needs to be singleton.
@@ -171,8 +171,7 @@ public static void setReadBufferManagerConfigs(final int readAheadBlockSize,
maxThreadPoolSize = abfsConfiguration.getMaxReadAheadV2ThreadPoolSize();
cpuMonitoringIntervalInMilliSec
= abfsConfiguration.getReadAheadV2CpuMonitoringIntervalMillis();
- cpuThreshold = abfsConfiguration.getReadAheadV2CpuUsageThresholdPercent()
- / HUNDRED_D;
+ cpuThreshold = abfsConfiguration.getReadAheadV2CpuUsageThresholdPercent();
threadPoolUpscalePercentage
= abfsConfiguration.getReadAheadV2ThreadPoolUpscalePercentage();
threadPoolDownscalePercentage
@@ -185,8 +184,7 @@ public static void setReadBufferManagerConfigs(final int readAheadBlockSize,
memoryMonitoringIntervalInMilliSec
= abfsConfiguration.getReadAheadV2MemoryMonitoringIntervalMillis();
memoryThreshold =
- abfsConfiguration.getReadAheadV2MemoryUsageThresholdPercent()
- / HUNDRED_D;
+ abfsConfiguration.getReadAheadV2MemoryUsageThresholdPercent();
setThresholdAgeMilliseconds(
abfsConfiguration.getReadAheadV2CachedBufferTTLMillis());
isDynamicScalingEnabled
@@ -854,7 +852,7 @@ private boolean manualEviction(final ReadBuffer buf) {
*/
private void adjustThreadPool() {
int currentPoolSize = workerRefs.size();
- double cpuLoad = ResourceUtilizationUtils.getJvmCpuLoad();
+ long cpuLoad = ResourceUtilizationUtils.getJvmCpuLoad();
if (cpuLoad > maxJvmCpuUtilization) {
maxJvmCpuUtilization = cpuLoad;
}
@@ -1096,7 +1094,7 @@ public ScheduledExecutorService getCpuMonitoringThread() {
* @return the highest JVM CPU utilization percentage recorded
*/
@VisibleForTesting
- public double getMaxJvmCpuUtilization() {
+ public long getMaxJvmCpuUtilization() {
return maxJvmCpuUtilization;
}
@@ -1168,10 +1166,10 @@ public static class ReadThreadPoolStats extends ResourceUtilizationStats {
*/
public ReadThreadPoolStats(int currentPoolSize,
int maxPoolSize, int activeThreads, int idleThreads,
- double jvmCpuLoad,
- double systemCpuUtilization, double availableHeapGB,
- double committedHeapGB, double usedHeapGB, double maxHeapGB, double memoryLoad,
- String lastScaleDirection, double maxCpuUtilization, long jvmProcessId) {
+ long jvmCpuLoad,
+ long systemCpuUtilization, long availableHeapGB,
+ long committedHeapGB, long usedHeapGB, long maxHeapGB, long memoryLoad,
+ String lastScaleDirection, long maxCpuUtilization, long jvmProcessId) {
super(currentPoolSize, maxPoolSize, activeThreads, idleThreads,
jvmCpuLoad, systemCpuUtilization, availableHeapGB,
committedHeapGB, usedHeapGB, maxHeapGB, memoryLoad, lastScaleDirection,
@@ -1189,10 +1187,10 @@ public ReadThreadPoolStats(int currentPoolSize,
* @return a {@link ReadThreadPoolStats} object containing the current thread pool
* and system resource statistics
*/
- synchronized ReadThreadPoolStats getCurrentStats(double jvmCpuLoad) {
+ synchronized ReadThreadPoolStats getCurrentStats(long jvmCpuLoad) {
if (workerPool == null) {
- return new ReadThreadPoolStats(ZERO, ZERO, ZERO, ZERO, ZERO_D, ZERO_D,
- ZERO_D, ZERO_D, ZERO_D, ZERO_D, ZERO_D, EMPTY_STRING, ZERO_D, ZERO);
+ return new ReadThreadPoolStats(ZERO, ZERO, ZERO, ZERO, ZERO, ZERO,
+ ZERO, ZERO, ZERO, ZERO, ZERO, EMPTY_STRING, ZERO, ZERO);
}
ThreadPoolExecutor exec = this.workerPool;
@@ -1217,7 +1215,7 @@ synchronized ReadThreadPoolStats getCurrentStats(double jvmCpuLoad) {
ResourceUtilizationUtils.getMemoryLoad(), // used/max
currentScaleDirection, // "I", "D", or ""
getMaxJvmCpuUtilization(), // Peak JVM CPU usage so far,
- ResourceUtilizationUtils.getJvmProcessId() // JVM process id.
+ JvmUniqueIdProvider.getJvmId() // JVM process id.
);
}
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ResourceUtilizationStats.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ResourceUtilizationStats.java
index 4fa0712e76563..6ddd2e9f6f2d0 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ResourceUtilizationStats.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ResourceUtilizationStats.java
@@ -18,7 +18,7 @@
package org.apache.hadoop.fs.azurebfs.services;
-import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HUNDRED_D;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.NO_ACTION_NEEDED;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.NO_SCALE_DOWN_AT_MIN;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.NO_SCALE_UP_AT_MAX;
@@ -40,15 +40,15 @@ public abstract class ResourceUtilizationStats {
private final int maxPoolSize; // Maximum allowed pool size
private final int activeThreads; // Number of threads currently executing tasks
private final int idleThreads; // Number of threads not executing tasks
- private final double jvmCpuLoad; // Current JVM CPU utilization (%)
- private final double systemCpuUtilization; // Current system CPU utilization (%)
- private final double availableHeapGB; // Available heap memory (GB)
- private final double committedHeapGB; // Total committed heap memory (GB)
- private final double usedHeapGB; // Used heap memory (GB)
- private final double maxHeapGB; // Max heap memory (GB)
- private final double memoryLoad; // Heap usage ratio (used/max)
- private final String lastScaleDirection; // Last resize direction: "I" (increase) or "D" (decrease)
- private final double maxCpuUtilization; // Peak JVM CPU observed in the current interval
+ private final long jvmCpuLoad; // Current JVM CPU utilization (%)
+ private final long systemCpuUtilization; // Current system CPU utilization (%)
+ private final long availableHeapGB; // Available heap memory (GB)
+ private final long committedHeapGB; // Total committed heap memory (GB)
+ private final long usedHeapGB; // Used heap memory (GB)
+ private final long maxHeapGB; // Max heap memory (GB)
+ private final long memoryLoad; // Heap usage ratio (used/max)
+ private String lastScaleDirection = EMPTY_STRING; // Last resize direction: "I" (increase) or "D" (decrease)
+ private long maxCpuUtilization = 0L; // Peak JVM CPU observed in the current interval
private final long jvmProcessId; // JVM Process ID
/**
@@ -73,9 +73,9 @@ public abstract class ResourceUtilizationStats {
*/
public ResourceUtilizationStats(int currentPoolSize,
int maxPoolSize, int activeThreads, int idleThreads,
- double jvmCpuLoad, double systemCpuUtilization, double availableHeapGB,
- double committedHeapGB, double usedHeapGB, double maxHeapGB, double memoryLoad, String lastScaleDirection,
- double maxCpuUtilization, long jvmProcessId) {
+ long jvmCpuLoad, long systemCpuUtilization, long availableHeapGB,
+ long committedHeapGB, long usedHeapGB, long maxHeapGB, long memoryLoad, String lastScaleDirection,
+ long maxCpuUtilization, long jvmProcessId) {
this.currentPoolSize = currentPoolSize;
this.maxPoolSize = maxPoolSize;
this.activeThreads = activeThreads;
@@ -113,32 +113,32 @@ public int getIdleThreads() {
}
/** @return the overall system CPU utilization percentage. */
- public double getSystemCpuUtilization() {
+ public long getSystemCpuUtilization() {
return systemCpuUtilization;
}
/** @return the available heap memory in gigabytes. */
- public double getMemoryUtilization() {
+ public long getMemoryUtilization() {
return availableHeapGB;
}
/** @return the total committed heap memory in gigabytes */
- public double getCommittedHeapGB() {
+ public long getCommittedHeapGB() {
return committedHeapGB;
}
/** @return the used heap memory in gigabytes */
- public double getUsedHeapGB() {
+ public long getUsedHeapGB() {
return usedHeapGB;
}
/** @return the max heap memory in gigabytes */
- public double getMaxHeapGB() {
+ public long getMaxHeapGB() {
return maxHeapGB;
}
/** @return the current JVM memory load (used / committed) as a value between 0.0 and 1.0 */
- public double getMemoryLoad() {
+ public long getMemoryLoad() {
return memoryLoad;
}
@@ -148,12 +148,12 @@ public String getLastScaleDirection() {
}
/** @return the JVM process CPU utilization percentage. */
- public double getJvmCpuLoad() {
+ public long getJvmCpuLoad() {
return jvmCpuLoad;
}
/** @return the max JVM process CPU utilization percentage. */
- public double getMaxCpuUtilization() {
+ public long getMaxCpuUtilization() {
return maxCpuUtilization;
}
@@ -190,13 +190,13 @@ public int getLastScaleDirectionNumeric(String lastScaleDirection) {
public String toString() {
return String.format(
"currentPoolSize=%d, maxPoolSize=%d, activeThreads=%d, idleThreads=%d, "
- + "jvmCpuLoad=%.2f%%, systemCpuUtilization=%.2f%%, "
- + "availableHeap=%.2fGB, committedHeap=%.2fGB, memoryLoad=%.2f%%, "
- + "scaleDirection=%s, maxCpuUtilization=%.2f%%, jvmProcessId=%d",
+ + "jvmCpuLoad=%d, systemCpuUtilization=%d, "
+ + "availableHeap=%d, committedHeap=%d, memoryLoad=%d, "
+ + "scaleDirection=%s, maxCpuUtilization=%d, jvmProcessId=%d",
currentPoolSize, maxPoolSize, activeThreads,
- idleThreads, jvmCpuLoad * HUNDRED_D, systemCpuUtilization * HUNDRED_D,
- availableHeapGB, committedHeapGB, memoryLoad * HUNDRED_D,
- lastScaleDirection, maxCpuUtilization * HUNDRED_D, jvmProcessId
+ idleThreads, jvmCpuLoad, systemCpuUtilization,
+ availableHeapGB, committedHeapGB, memoryLoad,
+ lastScaleDirection, maxCpuUtilization, jvmProcessId
);
}
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/ResourceUtilizationUtils.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/ResourceUtilizationUtils.java
index c151a483b1853..c6bda87c3a00e 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/ResourceUtilizationUtils.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/ResourceUtilizationUtils.java
@@ -27,139 +27,140 @@
import org.apache.hadoop.classification.VisibleForTesting;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.BYTES_PER_GIGABYTE;
-import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ZERO;
-import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ZERO_D;
/**
* Utility class for retrieving JVM- and system-level resource utilization
* metrics such as CPU load, memory usage, and available heap memory.
+ * All metrics are returned as long values with 2-decimal precision stored as integer (scaled by 100).
*/
public final class ResourceUtilizationUtils {
+ private static final long SCALE_FACTOR = 100L; // 2 decimal places
+
private ResourceUtilizationUtils() {
// Prevent instantiation
}
/**
- * Calculates the available heap memory in gigabytes.
- * This method uses {@link Runtime#getRuntime()} to obtain the maximum heap memory
- * allowed for the JVM and subtracts the currently used memory (total - free)
- * to determine how much heap memory is still available.
- * The result is rounded up to the nearest gigabyte.
+ * Scales a double value by {@link #SCALE_FACTOR} to store 2-decimal precision as long.
+ *
+ * @param value value to scale
+ * @return scaled long value
+ */
+ private static long scale(double value) {
+ return Math.round(value * SCALE_FACTOR);
+ }
+
+ /**
+ * Returns the available heap memory in gigabytes, calculated as the difference between
+ * the committed heap and used heap memory.
+ *
+ * The result is scaled by 100 for 2-decimal precision.
+ *
*
- * @return the available heap memory in gigabytes
+ * @return available heap memory in GB (scaled by 100)
*/
public static long getAvailableHeapMemory() {
- MemoryMXBean osBean = ManagementFactory.getMemoryMXBean();
- MemoryUsage memoryUsage = osBean.getHeapMemoryUsage();
- long availableHeapBytes = memoryUsage.getCommitted() - memoryUsage.getUsed();
- return (availableHeapBytes + BYTES_PER_GIGABYTE - 1) / BYTES_PER_GIGABYTE;
+ MemoryUsage mu = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
+ double gb = (mu.getCommitted() - mu.getUsed()) / (double) BYTES_PER_GIGABYTE;
+ return scale(gb);
}
/**
- * Returns the currently committed JVM heap memory in bytes.
- * This reflects the amount of heap the JVM has reserved from the OS and may grow as needed.
+ * Returns the JVM heap memory currently committed.
+ *
+ * Committed memory is the amount of memory guaranteed to be available for the JVM.
+ *
*
- * @return committed heap memory in bytes
+ * @return committed heap memory in GB (scaled by 100)
*/
@VisibleForTesting
- public static double getCommittedHeapMemory() {
+ public static long getCommittedHeapMemory() {
MemoryMXBean osBean = ManagementFactory.getMemoryMXBean();
MemoryUsage memoryUsage = osBean.getHeapMemoryUsage();
- return (double) memoryUsage.getCommitted() / BYTES_PER_GIGABYTE;
+ double gb = memoryUsage.getCommitted() / (double) BYTES_PER_GIGABYTE;
+ return scale(gb);
}
/**
- * Get the current CPU load of the system.
- * @return the CPU load as a double value between 0.0 and 1.0
+ * Returns the system-wide CPU load as a fraction (scaled by 100 for 2-decimal precision).
+ *
+ * The value ranges between 0 (no load) and 100 (full load). Returns 0 if CPU load cannot be obtained.
+ *
+ *
+ * @return system CPU load (scaled by 100)
*/
@VisibleForTesting
- public static double getSystemCpuLoad() {
- OperatingSystemMXBean osBean = ManagementFactory.getPlatformMXBean(
- OperatingSystemMXBean.class);
+ public static long getSystemCpuLoad() {
+ OperatingSystemMXBean osBean = ManagementFactory.getPlatformMXBean(OperatingSystemMXBean.class);
double cpuLoad = osBean.getSystemCpuLoad();
if (cpuLoad < 0) {
- // If the CPU load is not available, return 0.0
- return 0.0;
+ return 0L;
}
- return cpuLoad;
+ return scale(cpuLoad);
}
-
/**
- * Gets the current system CPU utilization.
+ * Returns the JVM process CPU load as a fraction (scaled by 100 for 2-decimal precision).
+ *
+ * The value ranges between 0 (no load) and 100 (full CPU used by this process). Returns 0 if CPU load cannot be obtained.
+ *
*
- * @return the CPU utilization as a fraction (0.0 to 1.0), or 0.0 if unavailable.
+ * @return JVM process CPU load (scaled by 100)
*/
@VisibleForTesting
- public static double getJvmCpuLoad() {
- OperatingSystemMXBean osBean = ManagementFactory.getPlatformMXBean(
- OperatingSystemMXBean.class);
+ public static long getJvmCpuLoad() {
+ OperatingSystemMXBean osBean = ManagementFactory.getPlatformMXBean(OperatingSystemMXBean.class);
double cpuLoad = osBean.getProcessCpuLoad();
- if (cpuLoad < ZERO) {
- return ZERO_D;
+ if (cpuLoad < 0) {
+ return 0L;
}
- return cpuLoad;
+ return scale(cpuLoad);
}
/**
- * Get the current memory load of the JVM.
- * @return the memory load as a double value between 0.0 and 1.0
+ * Returns the heap memory usage as a fraction of max heap (scaled by 100).
+ *
+ * @return memory load (used/max heap) scaled by 100
*/
@VisibleForTesting
- public static double getMemoryLoad() {
+ public static long getMemoryLoad() {
MemoryMXBean osBean = ManagementFactory.getMemoryMXBean();
MemoryUsage memoryUsage = osBean.getHeapMemoryUsage();
- return (double) memoryUsage.getUsed() / memoryUsage.getMax();
+ double memLoad = (double) memoryUsage.getUsed() / memoryUsage.getMax();
+ return scale(memLoad);
}
/**
- * Calculates the used heap memory in gigabytes.
- * This method returns the amount of heap memory currently used by the JVM.
- * The result is rounded up to the nearest gigabyte.
+ * Returns the currently used heap memory in gigabytes.
*
- * @return the used heap memory in gigabytes
+ * @return used heap memory in GB (scaled by 100)
*/
public static long getUsedHeapMemory() {
- MemoryMXBean osBean = ManagementFactory.getMemoryMXBean();
- MemoryUsage memoryUsage = osBean.getHeapMemoryUsage();
- long usedHeapBytes = memoryUsage.getUsed();
- return (usedHeapBytes + BYTES_PER_GIGABYTE - 1) / BYTES_PER_GIGABYTE;
+ MemoryUsage mu = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
+ double gb = mu.getUsed() / (double) BYTES_PER_GIGABYTE;
+ return scale(gb);
}
/**
- * Calculates the maximum heap memory allowed for the JVM in gigabytes.
- * This is the upper bound the JVM may expand its heap to.
+ * Returns the maximum heap memory that the JVM can use.
*
- * @return the maximum heap memory in gigabytes
+ * @return max heap memory in GB (scaled by 100)
*/
public static long getMaxHeapMemory() {
- MemoryMXBean osBean = ManagementFactory.getMemoryMXBean();
- MemoryUsage memoryUsage = osBean.getHeapMemoryUsage();
- long maxHeapBytes = memoryUsage.getMax();
- return (maxHeapBytes + BYTES_PER_GIGABYTE - 1) / BYTES_PER_GIGABYTE;
- }
-
-
- /**
- * Returns the process ID (PID) of the currently running JVM.
- * This method uses {@link ProcessHandle#current()} to obtain the ID of the
- * Java process.
- *
- * @return the PID of the current JVM process
- */
- public static long getJvmProcessId() {
- return ProcessHandle.current().pid();
+ MemoryUsage mu = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
+ double gb = mu.getMax() / (double) BYTES_PER_GIGABYTE;
+ return scale(gb);
}
/**
- * Calculates the available max heap memory in gigabytes.
- * This method uses {@link Runtime#getRuntime()} to obtain the maximum heap memory
- * allowed for the JVM and subtracts the currently used memory (total - free)
- * to determine how much heap memory is still available.
- * The result is rounded up to the nearest gigabyte.
+ * Returns the available heap memory relative to the max heap.
+ *
+ * This method calculates the difference between max heap and currently used heap,
+ * then converts it to gigabytes rounded up.
+ *
*
- * @return the available heap memory in gigabytes
+ * @return available heap memory in GB (rounded up)
*/
public static long getAvailableMaxHeapMemory() {
MemoryMXBean osBean = ManagementFactory.getMemoryMXBean();
@@ -168,3 +169,4 @@ public static long getAvailableMaxHeapMemory() {
return (availableHeapBytes + BYTES_PER_GIGABYTE - 1) / BYTES_PER_GIGABYTE;
}
}
+
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestWriteThreadPoolSizeManager.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestWriteThreadPoolSizeManager.java
index a332c30bc032d..735b3f276ba5f 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestWriteThreadPoolSizeManager.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestWriteThreadPoolSizeManager.java
@@ -26,9 +26,13 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
+import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
@@ -56,8 +60,8 @@
class TestWriteThreadPoolSizeManager extends AbstractAbfsIntegrationTest {
private AbfsConfiguration mockConfig;
- private static final double HIGH_CPU_UTILIZATION_THRESHOLD = 0.95;
- private static final double LOW_CPU_UTILIZATION_THRESHOLD = 0.05;
+ private static final long HIGH_CPU_UTILIZATION_THRESHOLD = 95;
+ private static final long LOW_CPU_UTILIZATION_THRESHOLD = 5;
private static final int LOW_MEMORY_USAGE_THRESHOLD_PERCENT = 100;
private static final int THREAD_SLEEP_DURATION_MS = 200;
private static final String TEST_FILE_PATH = "testFilePath";
@@ -75,8 +79,8 @@ class TestWriteThreadPoolSizeManager extends AbstractAbfsIntegrationTest {
private static final int WAIT_DURATION_MS = 3000;
private static final int LATCH_TIMEOUT_SECONDS = 60;
private static final int RESIZE_WAIT_TIME_MS = 6_000;
- private static final double HIGH_CPU_USAGE_RATIO = 0.95;
- private static final double LOW_CPU_USAGE_RATIO = 0.05;
+ private static final long HIGH_CPU_USAGE_RATIO = 95;
+ private static final long LOW_CPU_USAGE_RATIO = 5;
private static final int SLEEP_DURATION_MS = 150;
private static final int AWAIT_TIMEOUT_SECONDS = 45;
private static final int RESIZER_JOIN_TIMEOUT_MS = 2_000;
@@ -178,7 +182,7 @@ void testAdjustThreadPoolSizeBasedOnLowCPU()
try (AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem) {
WriteThreadPoolSizeManager instance
= WriteThreadPoolSizeManager.getInstance(abfs.getFileSystemId(),
- getAbfsStore(abfs).getAbfsConfiguration(),
+ mockConfig,
abfs.getAbfsClient().getAbfsCounters());
ExecutorService executor = instance.getExecutorService();
int initialSize = ((ThreadPoolExecutor) executor).getMaximumPoolSize();
@@ -833,7 +837,11 @@ void testThreadPoolOnLowCpuLoadAndMetricsUpdate()
.getAbfsWriteResourceUtilizationMetrics();
WriteThreadPoolSizeManager.WriteThreadPoolStats statsBefore =
- instance.getCurrentStats(ResourceUtilizationUtils.getJvmCpuLoad(), ResourceUtilizationUtils.getMemoryLoad());
+ instance.getCurrentStats(ResourceUtilizationUtils.getJvmCpuLoad(),
+ ResourceUtilizationUtils.getMemoryLoad(),
+ ResourceUtilizationUtils.getUsedHeapMemory(),
+ ResourceUtilizationUtils.getAvailableHeapMemory(),
+ ResourceUtilizationUtils.getCommittedHeapMemory());
ThreadPoolExecutor executor =
(ThreadPoolExecutor) instance.getExecutorService();
@@ -868,7 +876,11 @@ void testThreadPoolOnLowCpuLoadAndMetricsUpdate()
Thread.sleep(SLEEP_DURATION_30S_MS);
WriteThreadPoolSizeManager.WriteThreadPoolStats statsAfter =
- instance.getCurrentStats(ResourceUtilizationUtils.getJvmCpuLoad(), ResourceUtilizationUtils.getMemoryLoad());
+ instance.getCurrentStats(ResourceUtilizationUtils.getJvmCpuLoad(),
+ ResourceUtilizationUtils.getMemoryLoad(),
+ ResourceUtilizationUtils.getUsedHeapMemory(),
+ ResourceUtilizationUtils.getAvailableHeapMemory(),
+ ResourceUtilizationUtils.getCommittedHeapMemory());
//--- Validate that metrics and stats changed ---
Assertions.assertThat(statsAfter)
@@ -898,5 +910,54 @@ void testThreadPoolOnLowCpuLoadAndMetricsUpdate()
instance.close();
}
}
+
+ /**
+ * Verifies that the JVM identifier is initialized once and remains
+ * constant across multiple invocations within the same JVM process.
+ */
+ @Test
+ public void testJvmIdIsSingletonWithinJvm() {
+ int firstId = JvmUniqueIdProvider.getJvmId();
+ int secondId = JvmUniqueIdProvider.getJvmId();
+ int thirdId = JvmUniqueIdProvider.getJvmId();
+
+ assertEquals(firstId, secondId,
+ "Subsequent calls to getJvmId() should return the same value");
+ assertEquals(secondId, thirdId,
+ "JVM-scoped identifier must remain constant for the lifetime of the JVM");
+ }
+
+ /**
+ * Verifies that the JVM identifier is safely shared across multiple threads
+ * and that concurrent access returns the same value.
+ *
+ * This test ensures that static initialization of the identifier is
+ * thread-safe and occurs only once per JVM.
+ */
+ @Test
+ public void testJvmIdIsSameAcrossThreads()
+ throws ExecutionException, InterruptedException {
+
+ ExecutorService executor = Executors.newFixedThreadPool(4);
+
+ try {
+ Callable task = JvmUniqueIdProvider::getJvmId;
+ Future f1 = executor.submit(task);
+ Future f2 = executor.submit(task);
+ Future f3 = executor.submit(task);
+ Future f4 = executor.submit(task);
+
+ int expectedId = f1.get();
+ assertEquals(expectedId, f2.get(),
+ "JVM ID should be identical when accessed from different threads");
+ assertEquals(expectedId, f3.get(),
+ "JVM ID should be identical when accessed concurrently");
+ assertEquals(expectedId, f4.get(),
+ "JVM ID should be initialized once and shared across all threads");
+ } finally {
+ executor.shutdown();
+ executor.awaitTermination(5, TimeUnit.SECONDS);
+ }
+ }
}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestReadBufferManagerV2.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestReadBufferManagerV2.java
index 77ae7ff71dfde..e94c535bd3900 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestReadBufferManagerV2.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestReadBufferManagerV2.java
@@ -342,7 +342,7 @@ public void testReadMetricUpdation() throws Exception {
.as("Thread pool stats should update after CPU load")
.isNotEqualTo(statsBefore);
- boolean updatedMetrics = metrics.getUpdatedAtLeastOnce();
+ boolean updatedMetrics = metrics.isUpdated();
Assertions.assertThat(updatedMetrics)
.as("Metrics should be updated at least once after CPU load")