From 43a26ac61b7c420177afc2ac4c639632be5ec1bb Mon Sep 17 00:00:00 2001 From: Carpe-Wang Date: Thu, 19 Jun 2025 21:34:46 -0400 Subject: [PATCH 1/3] feat: Add dedicated thread pool for RestTemplate in alert notification system --- .../alert/notice/AlertNoticeDispatch.java | 45 ++++++++++++++----- .../impl/AbstractAlertNotifyHandlerImpl.java | 19 ++++++++ .../impl/WebHookAlertNotifyHandlerImpl.java | 1 + .../alert/notice/AlertNoticeDispatchTest.java | 8 +++- .../manager/config/RestTemplateConfig.java | 16 ++++++- 5 files changed, 77 insertions(+), 12 deletions(-) diff --git a/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/notice/AlertNoticeDispatch.java b/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/notice/AlertNoticeDispatch.java index 6d9207216b7..745f4153e79 100644 --- a/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/notice/AlertNoticeDispatch.java +++ b/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/notice/AlertNoticeDispatch.java @@ -21,8 +21,11 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; import lombok.extern.slf4j.Slf4j; import org.apache.hertzbeat.alert.AlerterWorkerPool; +import org.springframework.beans.factory.annotation.Qualifier; import org.apache.hertzbeat.alert.config.AlertSseManager; import org.apache.hertzbeat.common.entity.alerter.GroupAlert; import org.apache.hertzbeat.common.entity.alerter.NoticeReceiver; @@ -48,15 +51,20 @@ public class AlertNoticeDispatch { private final Map alertNotifyHandlerMap; private final PluginRunner pluginRunner; private final AlertSseManager emitterManager; + private final Executor restTemplateThreadPool; public AlertNoticeDispatch(AlerterWorkerPool workerPool, NoticeConfigService noticeConfigService, AlertStoreHandler alertStoreHandler, - List alertNotifyHandlerList, PluginRunner pluginRunner, AlertSseManager emitterManager) { + List alertNotifyHandlerList, + PluginRunner pluginRunner, + AlertSseManager emitterManager, + @Qualifier("restTemplateThreadPool") Executor restTemplateThreadPool) { this.workerPool = workerPool; this.noticeConfigService = noticeConfigService; this.alertStoreHandler = alertStoreHandler; this.pluginRunner = pluginRunner; + this.restTemplateThreadPool = restTemplateThreadPool; alertNotifyHandlerMap = Maps.newHashMapWithExpectedSize(alertNotifyHandlerList.size()); this.emitterManager = emitterManager; alertNotifyHandlerList.forEach(r -> alertNotifyHandlerMap.put(r.type(), r)); @@ -121,14 +129,31 @@ public void dispatchAlarm(GroupAlert groupAlert) { } private void sendNotify(GroupAlert alert) { - matchNoticeRulesByAlert(alert).ifPresent(noticeRules -> noticeRules.forEach(rule -> workerPool.executeNotify(() -> rule.getReceiverId() - .forEach(receiverId -> { - try { - sendNoticeMsg(getOneReceiverById(receiverId), - getOneTemplateById(rule.getTemplateId()), alert); - } catch (AlertNoticeException e) { - log.warn("DispatchTask sendNoticeMsg error, message: {}", e.getMessage()); - } - })))); + matchNoticeRulesByAlert(alert).ifPresent(noticeRules -> noticeRules.forEach(rule -> workerPool.executeNotify(() -> { + List> futures = rule.getReceiverId().stream() + .map(receiverId -> sendNoticeAsync(getOneReceiverById(receiverId), + getOneTemplateById(rule.getTemplateId()), alert)) + .toList(); + + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) + .whenComplete((result, exception) -> { + if (exception != null) { + log.warn("Some async notifications failed", exception); + } else { + log.debug("All notifications completed for alert: {}", alert.getGroupLabels()); + } + }); + }))); + } + + private CompletableFuture sendNoticeAsync(NoticeReceiver receiver, NoticeTemplate template, GroupAlert alert) { + return CompletableFuture.runAsync(() -> { + try { + sendNoticeMsg(receiver, template, alert); + } catch (AlertNoticeException e) { + log.warn("Async notification failed for receiver {}: {}", receiver.getName(), e.getMessage()); + throw new RuntimeException(e); + } + }, restTemplateThreadPool); } } diff --git a/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/notice/impl/AbstractAlertNotifyHandlerImpl.java b/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/notice/impl/AbstractAlertNotifyHandlerImpl.java index a37b9a26843..bdc856e1e4d 100644 --- a/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/notice/impl/AbstractAlertNotifyHandlerImpl.java +++ b/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/notice/impl/AbstractAlertNotifyHandlerImpl.java @@ -26,6 +26,8 @@ import java.util.Locale; import java.util.Map; import java.util.ResourceBundle; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; import lombok.extern.slf4j.Slf4j; import org.apache.hertzbeat.alert.AlerterProperties; import org.apache.hertzbeat.common.entity.alerter.GroupAlert; @@ -34,6 +36,7 @@ import org.apache.hertzbeat.common.util.ResourceBundleUtil; import org.apache.hertzbeat.alert.notice.AlertNotifyHandler; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.event.EventListener; import org.springframework.ui.freemarker.FreeMarkerTemplateUtils; import org.springframework.web.client.RestTemplate; @@ -50,6 +53,9 @@ abstract class AbstractAlertNotifyHandlerImpl implements AlertNotifyHandler { protected RestTemplate restTemplate; @Autowired protected AlerterProperties alerterProperties; + @Autowired + @Qualifier("restTemplateThreadPool") + protected Executor restTemplateThreadPool; protected String renderContent(NoticeTemplate noticeTemplate, GroupAlert alert) throws TemplateException, IOException { @@ -113,6 +119,19 @@ protected String escapeJsonStr(String jsonStr){ return sb.toString(); } + protected CompletableFuture sendAsync(org.apache.hertzbeat.common.entity.alerter.NoticeReceiver receiver, + org.apache.hertzbeat.common.entity.alerter.NoticeTemplate noticeTemplate, + GroupAlert alert) { + return CompletableFuture.runAsync(() -> { + try { + send(receiver, noticeTemplate, alert); + } catch (Exception e) { + log.error("Async alert notification failed", e); + throw new RuntimeException(e); + } + }, restTemplateThreadPool); + } + @EventListener(SystemConfigChangeEvent.class) public void onEvent(SystemConfigChangeEvent event) { log.info("{} receive system config change event: {}.", this.getClass().getName(), event.getSource()); diff --git a/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/notice/impl/WebHookAlertNotifyHandlerImpl.java b/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/notice/impl/WebHookAlertNotifyHandlerImpl.java index 54d56e532f7..3b342513ccd 100644 --- a/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/notice/impl/WebHookAlertNotifyHandlerImpl.java +++ b/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/notice/impl/WebHookAlertNotifyHandlerImpl.java @@ -17,6 +17,7 @@ package org.apache.hertzbeat.alert.notice.impl; +import java.util.concurrent.CompletableFuture; import lombok.extern.slf4j.Slf4j; import org.apache.hertzbeat.alert.notice.AlertNoticeException; import org.apache.hertzbeat.common.entity.alerter.GroupAlert; diff --git a/hertzbeat-alerter/src/test/java/org/apache/hertzbeat/alert/notice/AlertNoticeDispatchTest.java b/hertzbeat-alerter/src/test/java/org/apache/hertzbeat/alert/notice/AlertNoticeDispatchTest.java index 8102afcdc0b..3f289f6bbb5 100644 --- a/hertzbeat-alerter/src/test/java/org/apache/hertzbeat/alert/notice/AlertNoticeDispatchTest.java +++ b/hertzbeat-alerter/src/test/java/org/apache/hertzbeat/alert/notice/AlertNoticeDispatchTest.java @@ -24,6 +24,8 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.util.concurrent.Executor; + import java.util.Collections; import java.util.List; import org.apache.hertzbeat.alert.AlerterWorkerPool; @@ -64,6 +66,9 @@ class AlertNoticeDispatchTest { @Mock private AlertSseManager emitterManager; + @Mock + private Executor restTemplateThreadPool; + private AlertNoticeDispatch alertNoticeDispatch; private static final int DISPATCH_THREADS = 3; @@ -82,7 +87,8 @@ void setUp() { alertStoreHandler, alertNotifyHandlerList, pluginRunner, - emitterManager + emitterManager, + restTemplateThreadPool ); receiver = NoticeReceiver.builder() diff --git a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/config/RestTemplateConfig.java b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/config/RestTemplateConfig.java index d856c1d9b78..fbd7437c997 100644 --- a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/config/RestTemplateConfig.java +++ b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/config/RestTemplateConfig.java @@ -18,6 +18,8 @@ package org.apache.hertzbeat.manager.config; import java.util.Collections; +import java.util.concurrent.Executor; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import okhttp3.ConnectionPool; import okhttp3.OkHttpClient; @@ -26,11 +28,11 @@ import org.springframework.context.annotation.Configuration; import org.springframework.http.client.ClientHttpRequestFactory; import org.springframework.http.client.OkHttp3ClientHttpRequestFactory; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.web.client.RestTemplate; /** * restTemplate config - * todo thread pool */ @Configuration public class RestTemplateConfig { @@ -58,4 +60,16 @@ public ClientHttpRequestFactory simpleClientHttpRequestFactory() { ); } + @Bean("restTemplateThreadPool") + public Executor restTemplateThreadPool() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(10); + executor.setMaxPoolSize(50); + executor.setQueueCapacity(200); + executor.setThreadNamePrefix("RestTemplate-"); + executor.setKeepAliveSeconds(60); + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); + executor.initialize(); + return executor; + } } From 956a073875a538b9271466f80af7624727666692 Mon Sep 17 00:00:00 2001 From: Carpe-Wang Date: Fri, 20 Jun 2025 13:12:33 -0400 Subject: [PATCH 2/3] checkstyle --- .../alert/notice/impl/WebHookAlertNotifyHandlerImpl.java | 1 - .../apache/hertzbeat/alert/notice/AlertNoticeDispatchTest.java | 1 - 2 files changed, 2 deletions(-) diff --git a/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/notice/impl/WebHookAlertNotifyHandlerImpl.java b/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/notice/impl/WebHookAlertNotifyHandlerImpl.java index 3b342513ccd..54d56e532f7 100644 --- a/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/notice/impl/WebHookAlertNotifyHandlerImpl.java +++ b/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/notice/impl/WebHookAlertNotifyHandlerImpl.java @@ -17,7 +17,6 @@ package org.apache.hertzbeat.alert.notice.impl; -import java.util.concurrent.CompletableFuture; import lombok.extern.slf4j.Slf4j; import org.apache.hertzbeat.alert.notice.AlertNoticeException; import org.apache.hertzbeat.common.entity.alerter.GroupAlert; diff --git a/hertzbeat-alerter/src/test/java/org/apache/hertzbeat/alert/notice/AlertNoticeDispatchTest.java b/hertzbeat-alerter/src/test/java/org/apache/hertzbeat/alert/notice/AlertNoticeDispatchTest.java index 3f289f6bbb5..b85477fd87c 100644 --- a/hertzbeat-alerter/src/test/java/org/apache/hertzbeat/alert/notice/AlertNoticeDispatchTest.java +++ b/hertzbeat-alerter/src/test/java/org/apache/hertzbeat/alert/notice/AlertNoticeDispatchTest.java @@ -25,7 +25,6 @@ import static org.mockito.Mockito.when; import java.util.concurrent.Executor; - import java.util.Collections; import java.util.List; import org.apache.hertzbeat.alert.AlerterWorkerPool; From d54e188352d27b1df0f38fd946d96b040b88312e Mon Sep 17 00:00:00 2001 From: Carpe-Wang Date: Fri, 20 Jun 2025 23:02:52 -0400 Subject: [PATCH 3/3] change according review --- .../alert/notice/AlertNoticeDispatch.java | 34 +++++-------------- .../impl/AbstractAlertNotifyHandlerImpl.java | 18 ---------- .../manager/config/RestTemplateConfig.java | 20 +++++------ 3 files changed, 19 insertions(+), 53 deletions(-) diff --git a/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/notice/AlertNoticeDispatch.java b/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/notice/AlertNoticeDispatch.java index 745f4153e79..6e36a8e593e 100644 --- a/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/notice/AlertNoticeDispatch.java +++ b/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/notice/AlertNoticeDispatch.java @@ -21,7 +21,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import lombok.extern.slf4j.Slf4j; import org.apache.hertzbeat.alert.AlerterWorkerPool; @@ -130,30 +129,15 @@ public void dispatchAlarm(GroupAlert groupAlert) { private void sendNotify(GroupAlert alert) { matchNoticeRulesByAlert(alert).ifPresent(noticeRules -> noticeRules.forEach(rule -> workerPool.executeNotify(() -> { - List> futures = rule.getReceiverId().stream() - .map(receiverId -> sendNoticeAsync(getOneReceiverById(receiverId), - getOneTemplateById(rule.getTemplateId()), alert)) - .toList(); - - CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) - .whenComplete((result, exception) -> { - if (exception != null) { - log.warn("Some async notifications failed", exception); - } else { - log.debug("All notifications completed for alert: {}", alert.getGroupLabels()); - } - }); + rule.getReceiverId().forEach(receiverId -> { + restTemplateThreadPool.execute(() -> { + try { + sendNoticeMsg(getOneReceiverById(receiverId), getOneTemplateById(rule.getTemplateId()), alert); + } catch (Exception e) { + log.warn("Async notification failed for receiver {}: {}", receiverId, e.getMessage()); + } + }); + }); }))); } - - private CompletableFuture sendNoticeAsync(NoticeReceiver receiver, NoticeTemplate template, GroupAlert alert) { - return CompletableFuture.runAsync(() -> { - try { - sendNoticeMsg(receiver, template, alert); - } catch (AlertNoticeException e) { - log.warn("Async notification failed for receiver {}: {}", receiver.getName(), e.getMessage()); - throw new RuntimeException(e); - } - }, restTemplateThreadPool); - } } diff --git a/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/notice/impl/AbstractAlertNotifyHandlerImpl.java b/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/notice/impl/AbstractAlertNotifyHandlerImpl.java index bdc856e1e4d..ed3b140f9b9 100644 --- a/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/notice/impl/AbstractAlertNotifyHandlerImpl.java +++ b/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/notice/impl/AbstractAlertNotifyHandlerImpl.java @@ -26,7 +26,6 @@ import java.util.Locale; import java.util.Map; import java.util.ResourceBundle; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import lombok.extern.slf4j.Slf4j; import org.apache.hertzbeat.alert.AlerterProperties; @@ -53,10 +52,6 @@ abstract class AbstractAlertNotifyHandlerImpl implements AlertNotifyHandler { protected RestTemplate restTemplate; @Autowired protected AlerterProperties alerterProperties; - @Autowired - @Qualifier("restTemplateThreadPool") - protected Executor restTemplateThreadPool; - protected String renderContent(NoticeTemplate noticeTemplate, GroupAlert alert) throws TemplateException, IOException { StringTemplateLoader stringLoader = new StringTemplateLoader(); @@ -119,19 +114,6 @@ protected String escapeJsonStr(String jsonStr){ return sb.toString(); } - protected CompletableFuture sendAsync(org.apache.hertzbeat.common.entity.alerter.NoticeReceiver receiver, - org.apache.hertzbeat.common.entity.alerter.NoticeTemplate noticeTemplate, - GroupAlert alert) { - return CompletableFuture.runAsync(() -> { - try { - send(receiver, noticeTemplate, alert); - } catch (Exception e) { - log.error("Async alert notification failed", e); - throw new RuntimeException(e); - } - }, restTemplateThreadPool); - } - @EventListener(SystemConfigChangeEvent.class) public void onEvent(SystemConfigChangeEvent event) { log.info("{} receive system config change event: {}.", this.getClass().getName(), event.getSource()); diff --git a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/config/RestTemplateConfig.java b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/config/RestTemplateConfig.java index fbd7437c997..1c3f7cbe817 100644 --- a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/config/RestTemplateConfig.java +++ b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/config/RestTemplateConfig.java @@ -19,6 +19,7 @@ import java.util.Collections; import java.util.concurrent.Executor; +import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import okhttp3.ConnectionPool; @@ -28,7 +29,6 @@ import org.springframework.context.annotation.Configuration; import org.springframework.http.client.ClientHttpRequestFactory; import org.springframework.http.client.OkHttp3ClientHttpRequestFactory; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.web.client.RestTemplate; /** @@ -62,14 +62,14 @@ public ClientHttpRequestFactory simpleClientHttpRequestFactory() { @Bean("restTemplateThreadPool") public Executor restTemplateThreadPool() { - ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); - executor.setCorePoolSize(10); - executor.setMaxPoolSize(50); - executor.setQueueCapacity(200); - executor.setThreadNamePrefix("RestTemplate-"); - executor.setKeepAliveSeconds(60); - executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); - executor.initialize(); - return executor; + return new ThreadPoolExecutor( + 2, + Integer.MAX_VALUE, + 60L, + TimeUnit.SECONDS, + new SynchronousQueue<>(), + r -> new Thread(r, "RestTemplate-" + r.hashCode()), + new ThreadPoolExecutor.CallerRunsPolicy() + ); } }