Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.azurebfs;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a unit test for this assertinf singleton behavior

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added


import java.util.Random;

/**
* Provides a JVM-scoped identifier.
*
* <p>The identifier is generated once when the class is loaded and remains
* constant for the lifetime of the JVM. It is derived using a combination of
* the current system time and random entropy to reduce the likelihood of
* collisions across JVM instances.</p>
*
* <p>The identifier is intended for lightweight JVM-level identification,
* such as tagging metrics or log entries. It provides best-effort uniqueness
* and is not guaranteed to be globally unique.</p>
*
* <p>This class is utility-only and cannot be instantiated.</p>
*/
public final class JvmUniqueIdProvider {

/** Lower bound (inclusive) for the generated JVM identifier. */
private static final int MIN_JVM_ID = 100_000;

/** Size of the identifier value range. */
private static final int JVM_ID_RANGE = 900_000;

/** Upper bound for random entropy mixed into the identifier. */
private static final int RANDOM_ENTROPY_BOUND = 1_000;

/** JVM-scoped identifier generated at class initialization time. */
private static final int JVM_UNIQUE_ID;

static {
long time = System.currentTimeMillis();
int random = new Random().nextInt(RANDOM_ENTROPY_BOUND);
JVM_UNIQUE_ID = (int) ((time + random) % JVM_ID_RANGE) + MIN_JVM_ID;
}

/** Prevents instantiation. */
private JvmUniqueIdProvider() {
}

/**
* Returns the JVM-scoped identifier.
*
* @return an identifier that remains constant for the lifetime of the JVM
*/
public static int getJvmId() {
return JVM_UNIQUE_ID;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.MEDIUM_HEAP_SPACE_FACTOR;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HIGH_CPU_LOW_MEMORY_REDUCTION_FACTOR;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HIGH_CPU_REDUCTION_FACTOR;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HUNDRED_D;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.LOW_CPU_HIGH_MEMORY_DECREASE_FACTOR;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.LOW_CPU_POOL_SIZE_INCREASE_FACTOR;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MEDIUM_CPU_LOW_MEMORY_REDUCTION_FACTOR;
Expand All @@ -55,7 +54,6 @@
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_DIRECTION_UP;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.THIRTY_SECONDS;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ZERO;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ZERO_D;

/**
* Manages a thread pool for writing operations, adjusting the pool size based on CPU utilization.
Expand Down Expand Up @@ -91,11 +89,11 @@ public final class WriteThreadPoolSizeManager implements Closeable {
/* Tracks the last scale direction applied, or empty if none. */
private volatile String lastScaleDirection = EMPTY_STRING;
/* Maximum CPU utilization observed during the monitoring interval. */
private volatile double maxJvmCpuUtilization = 0.0;
private volatile long maxJvmCpuUtilization = 0L;
/** High memory usage threshold used to trigger thread pool downscaling. */
private final double highMemoryThreshold;
private final long highMemoryThreshold;
/** Low memory usage threshold used to allow thread pool upscaling. */
private final double lowMemoryThreshold;
private final long lowMemoryThreshold;

/**
* Private constructor to initialize the write thread pool and CPU monitor executor
Expand Down Expand Up @@ -136,8 +134,8 @@ private WriteThreadPoolSizeManager(String filesystemName,
executor.allowCoreThreadTimeOut(true);
/* Create a scheduled executor for CPU monitoring and pool adjustment */
this.cpuMonitorExecutor = Executors.newScheduledThreadPool(1);
highMemoryThreshold = abfsConfiguration.getWriteHighMemoryUsageThresholdPercent() / HUNDRED_D;
lowMemoryThreshold = abfsConfiguration.getWriteLowMemoryUsageThresholdPercent() / HUNDRED_D;
highMemoryThreshold = abfsConfiguration.getWriteHighMemoryUsageThresholdPercent();
lowMemoryThreshold = abfsConfiguration.getWriteLowMemoryUsageThresholdPercent();
}

/** Returns the internal {@link AbfsConfiguration}. */
Expand Down Expand Up @@ -245,7 +243,7 @@ public synchronized void startCPUMonitoring() {
if (!isMonitoringStarted()) {
isMonitoringStarted = true;
cpuMonitorExecutor.scheduleAtFixedRate(() -> {
double cpuUtilization = ResourceUtilizationUtils.getJvmCpuLoad();
long cpuUtilization = ResourceUtilizationUtils.getJvmCpuLoad();
LOG.debug("Current CPU Utilization is this: {}", cpuUtilization);
try {
adjustThreadPoolSizeBasedOnCPU(cpuUtilization);
Expand All @@ -266,24 +264,27 @@ public synchronized void startCPUMonitoring() {
* @param cpuUtilization Current system CPU utilization (0.0 to 1.0)
* @throws InterruptedException if the resizing operation is interrupted while acquiring the lock
*/
public void adjustThreadPoolSizeBasedOnCPU(double cpuUtilization) throws InterruptedException {
public void adjustThreadPoolSizeBasedOnCPU(long cpuUtilization) throws InterruptedException {
lock.lock();
try {
ThreadPoolExecutor executor = (ThreadPoolExecutor) this.boundedThreadPool;
int currentPoolSize = executor.getMaximumPoolSize();
double memoryLoad = ResourceUtilizationUtils.getMemoryLoad();
long memoryLoad = ResourceUtilizationUtils.getMemoryLoad();
long usedHeapMemory = ResourceUtilizationUtils.getUsedHeapMemory();
long availableMemory = ResourceUtilizationUtils.getAvailableHeapMemory();
long committedMemory = ResourceUtilizationUtils.getCommittedHeapMemory();
LOG.debug("The memory load is {} and CPU utilization is {}", memoryLoad, cpuUtilization);
if (cpuUtilization > (abfsConfiguration.getWriteHighCpuThreshold()/HUNDRED_D)) {
if (cpuUtilization > (abfsConfiguration.getWriteHighCpuThreshold())) {
newMaxPoolSize = calculateReducedPoolSizeHighCPU(currentPoolSize, memoryLoad);
if (currentPoolSize == initialPoolSize && newMaxPoolSize == initialPoolSize) {
lastScaleDirection = SCALE_DIRECTION_NO_DOWN_AT_MIN;
}
} else if (cpuUtilization > (abfsConfiguration.getWriteMediumCpuThreshold()/HUNDRED_D)) {
} else if (cpuUtilization > (abfsConfiguration.getWriteMediumCpuThreshold())) {
newMaxPoolSize = calculateReducedPoolSizeMediumCPU(currentPoolSize, memoryLoad);
if (currentPoolSize == initialPoolSize && newMaxPoolSize == initialPoolSize) {
lastScaleDirection = SCALE_DIRECTION_NO_DOWN_AT_MIN;
}
} else if (cpuUtilization < (abfsConfiguration.getWriteLowCpuThreshold()/HUNDRED_D)) {
} else if (cpuUtilization < (abfsConfiguration.getWriteLowCpuThreshold())) {
newMaxPoolSize = calculateIncreasedPoolSizeLowCPU(currentPoolSize, memoryLoad);
if (currentPoolSize == maxThreadPoolSize && newMaxPoolSize == maxThreadPoolSize) {
lastScaleDirection = SCALE_DIRECTION_NO_UP_AT_MAX;
Expand All @@ -294,7 +295,8 @@ public void adjustThreadPoolSizeBasedOnCPU(double cpuUtilization) throws Interru
}
boolean willResize = newMaxPoolSize != currentPoolSize;
if (!willResize && !lastScaleDirection.equals(EMPTY_STRING)) {
WriteThreadPoolStats stats = getCurrentStats(cpuUtilization, memoryLoad);
WriteThreadPoolStats stats = getCurrentStats(cpuUtilization, memoryLoad,
usedHeapMemory, availableMemory, committedMemory);
// Update the write thread pool metrics with the latest statistics snapshot.
writeThreadPoolMetrics.update(stats);
}
Expand All @@ -304,7 +306,8 @@ public void adjustThreadPoolSizeBasedOnCPU(double cpuUtilization) throws Interru
if (!willResize) {
try {
// Capture the latest thread pool statistics (pool size, CPU, memory, etc.).
WriteThreadPoolStats stats = getCurrentStats(cpuUtilization, memoryLoad);
WriteThreadPoolStats stats = getCurrentStats(cpuUtilization, memoryLoad,
usedHeapMemory, availableMemory, committedMemory);
// Update the write thread pool metrics with the latest statistics snapshot.
writeThreadPoolMetrics.update(stats);
} catch (Exception e) {
Expand All @@ -320,7 +323,8 @@ public void adjustThreadPoolSizeBasedOnCPU(double cpuUtilization) throws Interru
adjustThreadPoolSize(newMaxPoolSize);
try {
// Capture the latest thread pool statistics (pool size, CPU, memory, etc.).
WriteThreadPoolStats stats = getCurrentStats(cpuUtilization, memoryLoad);
WriteThreadPoolStats stats = getCurrentStats(cpuUtilization, memoryLoad,
usedHeapMemory, availableMemory, committedMemory);
// Update the write thread pool metrics with the latest statistics snapshot.
writeThreadPoolMetrics.update(stats);
} catch (Exception e) {
Expand Down Expand Up @@ -437,7 +441,7 @@ public synchronized boolean isMonitoringStarted() {
* @return the highest JVM CPU utilization percentage recorded
*/
@VisibleForTesting
public double getMaxJvmCpuUtilization() {
public long getMaxJvmCpuUtilization() {
return maxJvmCpuUtilization;
}

Expand Down Expand Up @@ -500,9 +504,9 @@ public static class WriteThreadPoolStats extends ResourceUtilizationStats {
*/
public WriteThreadPoolStats(int currentPoolSize,
int maxPoolSize, int activeThreads, int idleThreads,
double jvmCpuLoad, double systemCpuUtilization, double availableHeapGB,
double committedHeapGB, double usedHeapGB, double maxHeapGB, double memoryLoad, String lastScaleDirection,
double maxCpuUtilization, long jvmProcessId) {
long jvmCpuLoad, long systemCpuUtilization, long availableHeapGB,
long committedHeapGB, long usedHeapGB, long maxHeapGB, long memoryLoad, String lastScaleDirection,
long maxCpuUtilization, long jvmProcessId) {
super(currentPoolSize, maxPoolSize, activeThreads, idleThreads,
jvmCpuLoad, systemCpuUtilization, availableHeapGB,
committedHeapGB, usedHeapGB, maxHeapGB, memoryLoad, lastScaleDirection,
Expand All @@ -511,22 +515,28 @@ public WriteThreadPoolStats(int currentPoolSize,
}

/**
* Returns the latest statistics for the write thread pool and system resources.
* The snapshot includes thread counts, JVM and system CPU utilization, and the
* current heap usage. These metrics are used for monitoring and making dynamic
* sizing decisions for the write thread pool.
* Returns a snapshot of the current write thread pool and JVM/system resource
* statistics.
*
* @param jvmCpuUtilization current JVM CPU usage (%)
* @param memoryLoad current JVM memory load (used/committed)
* @return a {@link WriteThreadPoolStats} object containing the current metrics
* <p>The snapshot includes thread pool size and activity, JVM and system CPU
* utilization, and JVM heap memory metrics. These values are used for monitoring
* and for making dynamic scaling decisions for the write thread pool.</p>
*
* @param jvmCpuUtilization current JVM CPU utilization
* @param memoryLoad current JVM memory load ratio (used / max)
* @param usedMemory current used JVM heap memory
* @param availableMemory current available JVM heap memory
* @param committedMemory current committed JVM heap memory
*
* @return a {@link WriteThreadPoolStats} instance containing the current metrics
*/
synchronized WriteThreadPoolStats getCurrentStats(double jvmCpuUtilization,
double memoryLoad) {
synchronized WriteThreadPoolStats getCurrentStats(long jvmCpuUtilization,
long memoryLoad, long usedMemory, long availableMemory, long committedMemory) {

if (boundedThreadPool == null) {
return new WriteThreadPoolStats(
ZERO, ZERO, ZERO, ZERO, ZERO_D, ZERO_D, ZERO_D, ZERO_D, ZERO_D,
ZERO_D, ZERO_D, EMPTY_STRING, ZERO_D, ZERO);
ZERO, ZERO, ZERO, ZERO, ZERO, ZERO, ZERO, ZERO, ZERO,
ZERO, ZERO, EMPTY_STRING, ZERO, ZERO);
}

ThreadPoolExecutor exec = (ThreadPoolExecutor) this.boundedThreadPool;
Expand All @@ -544,15 +554,15 @@ synchronized WriteThreadPoolStats getCurrentStats(double jvmCpuUtilization,
activeThreads, // Busy threads
idleThreads, // Idle threads
jvmCpuUtilization, // JVM CPU usage (ratio)
ResourceUtilizationUtils.getSystemCpuLoad(), // System CPU usage (ratio)
ResourceUtilizationUtils.getAvailableHeapMemory(), // Free heap (GB)
ResourceUtilizationUtils.getCommittedHeapMemory(), // Committed heap (GB)
ResourceUtilizationUtils.getUsedHeapMemory(), // Used heap (GB)
ResourceUtilizationUtils.getSystemCpuLoad(), // System CPU usage (ratio)
availableMemory, // Free heap (GB)
committedMemory, // Committed heap (GB)
usedMemory, // Used heap (GB)
ResourceUtilizationUtils.getMaxHeapMemory(), // Max heap (GB)
memoryLoad, // used/max
currentScaleDirection, // "I", "D", or ""
getMaxJvmCpuUtilization(), // Peak JVM CPU usage so far
ResourceUtilizationUtils.getJvmProcessId() // JVM PID
JvmUniqueIdProvider.getJvmId() // JVM PID
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1338,7 +1338,11 @@ public AbfsRestOperation read(final String path,
AbfsReadResourceUtilizationMetrics readResourceUtilizationMetrics = retrieveReadResourceUtilizationMetrics();
// If metrics are available, record them in the tracing context for diagnostics or logging.
if (readResourceUtilizationMetrics != null) {
tracingContext.setResourceUtilizationMetricResults(readResourceUtilizationMetrics.toString());
String readMetrics = readResourceUtilizationMetrics.toString();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this logic, we can send same metrics data in two different read call, is this expected?
Should we move these lines under synchronized block?

String readMetrics = readResourceUtilizationMetrics.toString();
tracingContext.setResourceUtilizationMetricResults(readMetrics);
if (!readMetrics.isEmpty()) {
        readResourceUtilizationMetrics.markPushed();
      }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In toString() method, we are taking care of updateVersion and lastPushedVersion and pushing only if return updateVersion.get() > lastPushedVersion.get() to prevent same metric from getting pushed more than once, moreover markPushed is synchronized

tracingContext.setResourceUtilizationMetricResults(readMetrics);
if (!readMetrics.isEmpty()) {
readResourceUtilizationMetrics.markPushed();
}
}
URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = getAbfsRestOperation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1065,7 +1065,11 @@ public AbfsRestOperation read(final String path,
AbfsReadResourceUtilizationMetrics readResourceUtilizationMetrics = retrieveReadResourceUtilizationMetrics();
// If metrics are available, record them in the tracing context for diagnostics or logging.
if (readResourceUtilizationMetrics != null) {
tracingContext.setResourceUtilizationMetricResults(readResourceUtilizationMetrics.toString());
String readMetrics = readResourceUtilizationMetrics.toString();
tracingContext.setResourceUtilizationMetricResults(readMetrics);
if (!readMetrics.isEmpty()) {
readResourceUtilizationMetrics.markPushed();
}
}
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = getAbfsRestOperation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

package org.apache.hadoop.fs.azurebfs.services;

import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HUNDRED_D;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.hadoop.fs.azurebfs.enums.AbfsReadResourceUtilizationMetricsEnum;
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
Expand All @@ -36,6 +36,36 @@ public class AbfsReadResourceUtilizationMetrics
extends
AbstractAbfsResourceUtilizationMetrics<AbfsReadResourceUtilizationMetricsEnum> {

/**
* A version counter incremented each time a metric update occurs.
* Used to detect whether metrics have changed since the last serialization.
*/
private final AtomicLong updateVersion = new AtomicLong(0);

/**
* The last version number that was serialized and pushed out.
*/
private final AtomicLong lastPushedVersion = new AtomicLong(0);

@Override
protected boolean isUpdated() {
return updateVersion.get() > lastPushedVersion.get();
}

protected synchronized void markUpdated() {
updateVersion.incrementAndGet();
}

@Override
protected long getUpdateVersion() {
return updateVersion.get();
}

@Override
protected long getLastPushedVersion() {
return lastPushedVersion.get();
}

/**
* Creates a metrics set for read operations, initializing all
* metric keys defined in {@link AbfsReadResourceUtilizationMetricsEnum}.
Expand All @@ -44,6 +74,15 @@ public AbfsReadResourceUtilizationMetrics() {
super(AbfsReadResourceUtilizationMetricsEnum.values(), FSOperationType.READ.toString());
}

/**
* Marks the current metrics version as pushed.
* Must be called only after the metrics string is actually emitted.
*/
@Override
public synchronized void markPushed() {
lastPushedVersion.set(updateVersion.get());
}

/**
* Updates all read-thread-pool metrics using the latest stats snapshot.
* <p>
Expand Down Expand Up @@ -71,16 +110,16 @@ public synchronized void update(ReadBufferManagerV2.ReadThreadPoolStats stats) {
setMetricValue(AbfsReadResourceUtilizationMetricsEnum.MAX_POOL_SIZE, stats.getMaxPoolSize());
setMetricValue(AbfsReadResourceUtilizationMetricsEnum.ACTIVE_THREADS, stats.getActiveThreads());
setMetricValue(AbfsReadResourceUtilizationMetricsEnum.IDLE_THREADS, stats.getIdleThreads());
setMetricValue(AbfsReadResourceUtilizationMetricsEnum.JVM_CPU_UTILIZATION, stats.getJvmCpuLoad() * HUNDRED_D);
setMetricValue(AbfsReadResourceUtilizationMetricsEnum.SYSTEM_CPU_UTILIZATION, stats.getSystemCpuUtilization() * HUNDRED_D);
setMetricValue(AbfsReadResourceUtilizationMetricsEnum.JVM_CPU_UTILIZATION, stats.getJvmCpuLoad());
setMetricValue(AbfsReadResourceUtilizationMetricsEnum.SYSTEM_CPU_UTILIZATION, stats.getSystemCpuUtilization());
setMetricValue(AbfsReadResourceUtilizationMetricsEnum.AVAILABLE_MEMORY, stats.getMemoryUtilization());
setMetricValue(AbfsReadResourceUtilizationMetricsEnum.COMMITTED_MEMORY, stats.getCommittedHeapGB());
setMetricValue(AbfsReadResourceUtilizationMetricsEnum.USED_MEMORY, stats.getUsedHeapGB());
setMetricValue(AbfsReadResourceUtilizationMetricsEnum.MAX_HEAP_MEMORY, stats.getMaxHeapGB());
setMetricValue(AbfsReadResourceUtilizationMetricsEnum.MEMORY_LOAD, stats.getMemoryLoad() * HUNDRED_D);
setMetricValue(AbfsReadResourceUtilizationMetricsEnum.MEMORY_LOAD, stats.getMemoryLoad());
setMetricValue(AbfsReadResourceUtilizationMetricsEnum.LAST_SCALE_DIRECTION,
stats.getLastScaleDirectionNumeric(stats.getLastScaleDirection()));
setMetricValue(AbfsReadResourceUtilizationMetricsEnum.MAX_CPU_UTILIZATION, stats.getMaxCpuUtilization() * HUNDRED_D);
setMetricValue(AbfsReadResourceUtilizationMetricsEnum.MAX_CPU_UTILIZATION, stats.getMaxCpuUtilization());
setMetricValue(AbfsReadResourceUtilizationMetricsEnum.JVM_PROCESS_ID, stats.getJvmProcessId());

markUpdated();
Expand Down
Loading