Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.micrometer.core.instrument.internal.TimedScheduledExecutorService;

import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.List;
import java.util.Set;
import java.util.concurrent.*;
Expand Down Expand Up @@ -56,6 +57,11 @@
@NonNullFields
public class ExecutorServiceMetrics implements MeterBinder {

private static final String CLASS_NAME_THREAD_PER_TASK_EXECUTOR = "java.util.concurrent.ThreadPerTaskExecutor";

@Nullable
private static final Method METHOD_THREAD_COUNT_FROM_THREAD_PER_TASK_EXECUTOR = getMethodForThreadCountFromThreadPerTaskExecutor();

private static boolean allowIllegalReflectiveAccess = true;

private static final InternalLogger log = InternalLoggerFactory.getInstance(ExecutorServiceMetrics.class);
Expand Down Expand Up @@ -315,6 +321,9 @@ else if (className.equals("java.util.concurrent.Executors$FinalizableDelegatedEx
monitor(registry,
unwrapThreadPoolExecutor(executorService, executorService.getClass().getSuperclass()));
}
else if (className.equals(CLASS_NAME_THREAD_PER_TASK_EXECUTOR)) {
monitorThreadPerTaskExecutor(registry, executorService);
}
else {
log.warn("Failed to bind as {} is unsupported.", className);
}
Expand Down Expand Up @@ -439,6 +448,39 @@ private void monitor(MeterRegistry registry, ForkJoinPool fj) {
registeredMeterIds.addAll(meters.stream().map(Meter::getId).collect(toSet()));
}

private void monitorThreadPerTaskExecutor(MeterRegistry registry, ExecutorService executorService) {
List<Meter> meters = asList(Gauge
.builder(metricPrefix + "executor.active", executorService,
ExecutorServiceMetrics::getThreadCountFromThreadPerTaskExecutor)
.tags(tags)
.description("The approximate number of threads that are actively executing tasks")
.baseUnit(BaseUnits.THREADS)
.register(registry));
registeredMeterIds.addAll(meters.stream().map(Meter::getId).collect(toSet()));
}

private static long getThreadCountFromThreadPerTaskExecutor(ExecutorService executorService) {
try {
return (long) METHOD_THREAD_COUNT_FROM_THREAD_PER_TASK_EXECUTOR.invoke(executorService);
}
catch (Throwable e) {
throw new RuntimeException(e);
}
}

@Nullable
private static Method getMethodForThreadCountFromThreadPerTaskExecutor() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it be better to generate a java.lang.invoke.MethodHandle once (maybe static) and use it (the same one, cached) in every call? Maybe it would be better for performance reasons

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@juliojgd Thanks for the feedback!

I'm not sure how easy implementing this with the MethodHandle would be, but it's not a hot spot, so I think it's okay as is.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO, the gauge will call the method every N seconds, right? And the method in turn will make the reflective call each time, so maybe it could be a hotspot (due to the number of calls made during the JVM instance life).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@juliojgd I'm not familiar with the MethodHandle, so I'm not sure how effective it is here, but I attempted to switch to it in e5d159b.

try {
Class<?> clazz = Class.forName(CLASS_NAME_THREAD_PER_TASK_EXECUTOR);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this could work

        Class<?> clazz = Class.forName(CLASS_NAME_THREAD_PER_TASK_EXECUTOR);
        // Get private access to the class
        MethodHandles.Lookup privateLookup = MethodHandles.privateLookupIn(clazz, MethodHandles.lookup());
        // Find the private method
        return privateLookup.findVirtual(clazz, "threadCount", MethodType.methodType(long.class));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with methodhandles in theory the accesscheck occurs only once (during the lookup) but with plain reflection the access check occurs in every invocation. At least this is my understanding

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@juliojgd MethodHandles.privateLookupIn() requires JDK 9+, but Micrometer needs to work on JDK 8, so we can't call it directly.

Method method = clazz.getMethod("threadCount");
method.setAccessible(true);
return method;
}
catch (Throwable e) {
return null;
}
}

/**
* Disable illegal reflective accesses.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,17 @@
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

import java.time.Duration;
import java.util.concurrent.*;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

/**
* Tests for {@link ExecutorServiceMetrics} with reflection enabled.
*
* @author Tommy Ludwig
* @author Johnny Lim
*/
@Tag("reflective")
class ExecutorServiceMetricsReflectiveTests {
Expand All @@ -45,4 +48,25 @@ void threadPoolMetricsWith_AutoShutdownDelegatedExecutorService() throws Interru
assertThat(registry.get("executor.completed").tag("name", "test").functionCounter().count()).isEqualTo(1L);
}

@Test
void monitorWithExecutorsNewVirtualThreadPerTaskExecutor() {
CountDownLatch latch = new CountDownLatch(1);
ExecutorService unmonitored = Executors.newVirtualThreadPerTaskExecutor();
assertThat(unmonitored.getClass().getName()).isEqualTo("java.util.concurrent.ThreadPerTaskExecutor");
ExecutorService monitored = ExecutorServiceMetrics.monitor(registry, unmonitored, "test");
monitored.execute(() -> {
try {
latch.await(1, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
await().atMost(Duration.ofSeconds(1))
.untilAsserted(() -> assertThat(registry.get("executor.active").gauge().value()).isEqualTo(1));
latch.countDown();
await().atMost(Duration.ofSeconds(1))
.untilAsserted(() -> assertThat(registry.get("executor.active").gauge().value()).isEqualTo(0));
}

}