Skip to content

Commit 88347b5

Browse files
HzjNeverStop致节
andauthored
add runnable/callable class in async invoke method (#1327)
Co-authored-by: 致节 <[email protected]>
1 parent 5473c76 commit 88347b5

File tree

4 files changed

+197
-87
lines changed

4 files changed

+197
-87
lines changed

sofa-boot-project/sofa-boot-actuator/src/main/java/com/alipay/sofa/boot/actuator/health/HealthCheckerProcessor.java

Lines changed: 69 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.util.List;
3434
import java.util.Map;
3535
import java.util.Optional;
36+
import java.util.concurrent.Callable;
3637
import java.util.concurrent.CopyOnWriteArrayList;
3738
import java.util.concurrent.CountDownLatch;
3839
import java.util.concurrent.ExecutorService;
@@ -111,19 +112,8 @@ public boolean readinessHealthCheck(Map<String, Health> healthMap) {
111112
if (isParallelCheck()) {
112113
CountDownLatch countDownLatch = new CountDownLatch(readinessHealthCheckers.size());
113114
AtomicBoolean parallelResult = new AtomicBoolean(true);
114-
readinessHealthCheckers.forEach((String key, HealthChecker value) -> healthCheckExecutor.execute(() -> {
115-
try {
116-
if (!doHealthCheck(key, value, false, healthMap, true, false)) {
117-
parallelResult.set(false);
118-
}
119-
} catch (Throwable t) {
120-
parallelResult.set(false);
121-
logger.error(ErrorCode.convert("01-22004"), t);
122-
healthMap.put(key, new Health.Builder().withException(t).status(Status.DOWN).build());
123-
} finally {
124-
countDownLatch.countDown();
125-
}
126-
}));
115+
readinessHealthCheckers.forEach((String key, HealthChecker value) -> healthCheckExecutor.execute(
116+
new AsyncHealthCheckRunnable(key, value, healthMap, parallelResult, countDownLatch)));
127117
boolean finished = false;
128118
try {
129119
finished = countDownLatch.await(getParallelCheckTimeout(), TimeUnit.MILLISECONDS);
@@ -160,7 +150,7 @@ public boolean readinessHealthCheck(Map<String, Health> healthMap) {
160150
* @return health check passes or not
161151
*/
162152
private boolean doHealthCheck(String beanId, HealthChecker healthChecker, boolean isRetry,
163-
Map<String, Health> healthMap, boolean isReadiness, boolean wait) {
153+
Map<String, Health> healthMap, boolean isReadiness, boolean wait) {
164154
Assert.notNull(healthMap, "HealthMap must not be null");
165155

166156
Health health;
@@ -180,20 +170,23 @@ private boolean doHealthCheck(String beanId, HealthChecker healthChecker, boolea
180170
do {
181171
try {
182172
if (wait) {
183-
Future<Health> future = healthCheckExecutor.submit(healthChecker::isHealthy);
173+
Future<Health> future = healthCheckExecutor
174+
.submit(new AsyncHealthCheckCallable(healthChecker));
184175
health = future.get(timeout, TimeUnit.MILLISECONDS);
185176
} else {
186177
health = healthChecker.isHealthy();
187178
}
188-
} catch (TimeoutException e) {
189-
logger.error(
179+
} catch (TimeoutException e) {
180+
logger
181+
.error(
190182
"Timeout occurred while doing HealthChecker[{}] {} check, the timeout value is: {}ms.",
191183
beanId, checkType, timeout);
192-
health = new Health.Builder().withException(e).withDetail("timeout", timeout).status(Status.UNKNOWN).build();
184+
health = new Health.Builder().withException(e).withDetail("timeout", timeout)
185+
.status(Status.UNKNOWN).build();
193186
} catch (Throwable e) {
194187
logger.error(String.format(
195-
"Exception occurred while wait the result of HealthChecker[%s] %s check.",
196-
beanId, checkType), e);
188+
"Exception occurred while wait the result of HealthChecker[%s] %s check.",
189+
beanId, checkType), e);
197190
health = new Health.Builder().withException(e).status(Status.DOWN).build();
198191
}
199192
result = health.getStatus().equals(Status.UP);
@@ -208,9 +201,7 @@ private boolean doHealthCheck(String beanId, HealthChecker healthChecker, boolea
208201
retryCount += 1;
209202
TimeUnit.MILLISECONDS.sleep(healthChecker.getRetryTimeInterval());
210203
} catch (InterruptedException e) {
211-
logger
212-
.error(ErrorCode.convert("01-23002", retryCount, beanId,
213-
checkType), e);
204+
logger.error(ErrorCode.convert("01-23002", retryCount, beanId, checkType), e);
214205
}
215206
}
216207
} while (isRetry && retryCount < healthChecker.getRetryCount());
@@ -223,12 +214,12 @@ private boolean doHealthCheck(String beanId, HealthChecker healthChecker, boolea
223214
if (!result) {
224215
if (healthChecker.isStrictCheck()) {
225216
logger.error(ErrorCode.convert("01-23001", beanId, checkType, retryCount,
226-
objectMapper.writeValueAsString(health.getDetails()),
227-
healthChecker.isStrictCheck()));
217+
objectMapper.writeValueAsString(health.getDetails()),
218+
healthChecker.isStrictCheck()));
228219
} else {
229220
logger.warn(ErrorCode.convert("01-23001", beanId, checkType, retryCount,
230-
objectMapper.writeValueAsString(health.getDetails()),
231-
healthChecker.isStrictCheck()));
221+
objectMapper.writeValueAsString(health.getDetails()),
222+
healthChecker.isStrictCheck()));
232223
}
233224
}
234225
} catch (JsonProcessingException ex) {
@@ -364,4 +355,55 @@ public int getTimeout() {
364355
}
365356

366357
}
358+
359+
private class AsyncHealthCheckRunnable implements Runnable {
360+
private final String key;
361+
private final HealthChecker value;
362+
private final Map<String, Health> healthMap;
363+
364+
private final AtomicBoolean parallelResult;
365+
366+
private final CountDownLatch countDownLatch;
367+
368+
public AsyncHealthCheckRunnable(String key, HealthChecker value,
369+
Map<String, Health> healthMap,
370+
AtomicBoolean parallelResult, CountDownLatch countDownLatch) {
371+
this.key = key;
372+
this.value = value;
373+
this.healthMap = healthMap;
374+
this.parallelResult = parallelResult;
375+
this.countDownLatch = countDownLatch;
376+
}
377+
378+
@Override
379+
public void run() {
380+
try {
381+
if (!HealthCheckerProcessor.this.doHealthCheck(key, value, false, healthMap, true,
382+
false)) {
383+
parallelResult.set(false);
384+
}
385+
} catch (Throwable t) {
386+
parallelResult.set(false);
387+
logger.error(ErrorCode.convert("01-22004"), t);
388+
healthMap.put(key, new Health.Builder().withException(t).status(Status.DOWN)
389+
.build());
390+
} finally {
391+
countDownLatch.countDown();
392+
}
393+
}
394+
}
395+
396+
private class AsyncHealthCheckCallable implements Callable<Health> {
397+
398+
private final HealthChecker healthChecker;
399+
400+
public AsyncHealthCheckCallable(HealthChecker healthChecker) {
401+
this.healthChecker = healthChecker;
402+
}
403+
404+
@Override
405+
public Health call() throws Exception {
406+
return healthChecker.isHealthy();
407+
}
408+
}
367409
}

sofa-boot-project/sofa-boot-actuator/src/main/java/com/alipay/sofa/boot/actuator/health/HealthIndicatorProcessor.java

Lines changed: 55 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import java.util.Map;
4242
import java.util.Optional;
4343
import java.util.Set;
44+
import java.util.concurrent.Callable;
4445
import java.util.concurrent.CopyOnWriteArrayList;
4546
import java.util.concurrent.CountDownLatch;
4647
import java.util.concurrent.ExecutorService;
@@ -167,19 +168,8 @@ public boolean readinessHealthCheck(Map<String, Health> healthMap) {
167168
if (isParallelCheck()) {
168169
CountDownLatch countDownLatch = new CountDownLatch(healthIndicators.size());
169170
AtomicBoolean parallelResult = new AtomicBoolean(true);
170-
healthIndicators.forEach((key, value) -> healthCheckExecutor.execute(() -> {
171-
try {
172-
if (!doHealthCheck(key, value, healthMap, false)) {
173-
parallelResult.set(false);
174-
}
175-
} catch (Throwable t) {
176-
parallelResult.set(false);
177-
logger.error(ErrorCode.convert("01-21003"), t);
178-
healthMap.put(key, new Health.Builder().withException(t).status(Status.DOWN).build());
179-
} finally {
180-
countDownLatch.countDown();
181-
}
182-
}));
171+
healthIndicators.forEach((key, value) -> healthCheckExecutor.execute(
172+
new AsyncHealthIndicatorRunnable(key, value, healthMap, parallelResult, countDownLatch)));
183173
boolean finished = false;
184174
try {
185175
finished = countDownLatch.await(getParallelCheckTimeout(), TimeUnit.MILLISECONDS);
@@ -226,7 +216,7 @@ public boolean doHealthCheck(String beanId, HealthIndicator healthIndicator,
226216
try {
227217
if (wait) {
228218
Future<Health> future = healthCheckExecutor
229-
.submit(healthIndicator::health);
219+
.submit(new AsyncHealthIndicatorCallable(healthIndicator));
230220
health = future.get(timeout, TimeUnit.MILLISECONDS);
231221
} else {
232222
health = healthIndicator.health();
@@ -315,4 +305,55 @@ public void setHealthIndicatorConfig(Map<String, HealthCheckerConfig> healthIndi
315305
public List<BaseStat> getHealthIndicatorStartupStatList() {
316306
return healthIndicatorStartupStatList;
317307
}
308+
309+
private class AsyncHealthIndicatorRunnable implements Runnable {
310+
private final String key;
311+
private final HealthIndicator value;
312+
private final Map<String, Health> healthMap;
313+
314+
private final AtomicBoolean parallelResult;
315+
316+
private final CountDownLatch countDownLatch;
317+
318+
public AsyncHealthIndicatorRunnable(String key, HealthIndicator value,
319+
Map<String, Health> healthMap,
320+
AtomicBoolean parallelResult,
321+
CountDownLatch countDownLatch) {
322+
this.key = key;
323+
this.value = value;
324+
this.healthMap = healthMap;
325+
this.parallelResult = parallelResult;
326+
this.countDownLatch = countDownLatch;
327+
}
328+
329+
@Override
330+
public void run() {
331+
try {
332+
if (!HealthIndicatorProcessor.this.doHealthCheck(key, value, healthMap, false)) {
333+
parallelResult.set(false);
334+
}
335+
} catch (Throwable t) {
336+
parallelResult.set(false);
337+
logger.error(ErrorCode.convert("01-21003"), t);
338+
healthMap.put(key, new Health.Builder().withException(t).status(Status.DOWN)
339+
.build());
340+
} finally {
341+
countDownLatch.countDown();
342+
}
343+
}
344+
}
345+
346+
private class AsyncHealthIndicatorCallable implements Callable<Health> {
347+
348+
private final HealthIndicator healthIndicator;
349+
350+
public AsyncHealthIndicatorCallable(HealthIndicator healthIndicator) {
351+
this.healthIndicator = healthIndicator;
352+
}
353+
354+
@Override
355+
public Health call() throws Exception {
356+
return healthIndicator.health();
357+
}
358+
}
318359
}

sofa-boot-project/sofa-boot-core/isle-sofa-boot/src/main/java/com/alipay/sofa/boot/isle/stage/SpringContextInstallStage.java

Lines changed: 46 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -184,38 +184,12 @@ private void doRefreshSpringContextParallel() {
184184
/**
185185
* Refresh all {@link ApplicationContext} recursively
186186
*/
187-
private void refreshRecursively(DeploymentDescriptor deployment,
188-
CountDownLatch latch, List<Future<?>> futures) {
187+
private void refreshRecursively(DeploymentDescriptor deployment, CountDownLatch latch,
188+
List<Future<?>> futures) {
189189
// if interrupted, moduleRefreshExecutorService will be null;
190190
if (moduleRefreshExecutorService != null) {
191-
futures.add(moduleRefreshExecutorService.submit(() -> {
192-
String oldName = Thread.currentThread().getName();
193-
try {
194-
Thread.currentThread().setName(
195-
"sofa-module-refresh-" + deployment.getModuleName());
196-
if (deployment.isSpringPowered() && !application.getFailed().contains(deployment)) {
197-
refreshAndCollectCost(deployment);
198-
}
199-
DependencyTree.Entry<String, DeploymentDescriptor> entry = application
200-
.getDeployRegistry().getEntry(deployment.getModuleName());
201-
if (entry != null && entry.getDependsOnMe() != null) {
202-
for (DependencyTree.Entry<String, DeploymentDescriptor> child : entry
203-
.getDependsOnMe()) {
204-
child.getDependencies().remove(entry);
205-
if (child.getDependencies().size() == 0) {
206-
refreshRecursively(child.get(), latch, futures);
207-
}
208-
}
209-
}
210-
} catch (Throwable t) {
211-
LOGGER.error(ErrorCode.convert("01-11002", deployment.getName()), t);
212-
throw new RuntimeException(ErrorCode.convert("01-11002", deployment.getName()),
213-
t);
214-
} finally {
215-
latch.countDown();
216-
Thread.currentThread().setName(oldName);
217-
}
218-
}));
191+
futures.add(moduleRefreshExecutorService.submit(new AsyncSpringContextRunnable(
192+
deployment, latch, futures)));
219193
}
220194
}
221195

@@ -291,4 +265,46 @@ public String getName() {
291265
public int getOrder() {
292266
return 20000;
293267
}
268+
269+
private class AsyncSpringContextRunnable implements Runnable {
270+
private final DeploymentDescriptor deployment;
271+
private final CountDownLatch latch;
272+
private final List<Future<?>> futures;
273+
274+
public AsyncSpringContextRunnable(DeploymentDescriptor deployment, CountDownLatch latch,
275+
List<Future<?>> futures) {
276+
this.deployment = deployment;
277+
this.latch = latch;
278+
this.futures = futures;
279+
}
280+
281+
@Override
282+
public void run() {
283+
String oldName = Thread.currentThread().getName();
284+
try {
285+
Thread.currentThread().setName("sofa-module-refresh-" + deployment.getModuleName());
286+
if (deployment.isSpringPowered() && !application.getFailed().contains(deployment)) {
287+
SpringContextInstallStage.this.refreshAndCollectCost(deployment);
288+
}
289+
DependencyTree.Entry<String, DeploymentDescriptor> entry = application
290+
.getDeployRegistry().getEntry(deployment.getModuleName());
291+
if (entry != null && entry.getDependsOnMe() != null) {
292+
for (DependencyTree.Entry<String, DeploymentDescriptor> child : entry
293+
.getDependsOnMe()) {
294+
child.getDependencies().remove(entry);
295+
if (child.getDependencies().size() == 0) {
296+
SpringContextInstallStage.this.refreshRecursively(child.get(), latch,
297+
futures);
298+
}
299+
}
300+
}
301+
} catch (Throwable t) {
302+
LOGGER.error(ErrorCode.convert("01-11002", deployment.getName()), t);
303+
throw new RuntimeException(ErrorCode.convert("01-11002", deployment.getName()), t);
304+
} finally {
305+
latch.countDown();
306+
Thread.currentThread().setName(oldName);
307+
}
308+
}
309+
}
294310
}

sofa-boot-project/sofa-boot-core/runtime-sofa-boot/src/main/java/com/alipay/sofa/runtime/async/AsyncInitializeBeanMethodInvoker.java

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -75,28 +75,15 @@ public Object invoke(final MethodInvocation invocation) throws Throwable {
7575
if (!isAsyncCalled && methodName.equals(asyncMethodName)) {
7676
isAsyncCalled = true;
7777
isAsyncCalling = true;
78-
asyncInitMethodManager.submitTask(() -> {
79-
try {
80-
long startTime = System.currentTimeMillis();
81-
invocation.getMethod().invoke(targetObject, invocation.getArguments());
82-
LOGGER.info("{}({}) {} method execute {}dms.", targetObject
83-
.getClass().getName(), beanName, methodName, (System
84-
.currentTimeMillis() - startTime));
85-
} catch (Throwable e) {
86-
throw new RuntimeException(e);
87-
} finally {
88-
asyncMethodFinish();
89-
}
90-
});
78+
asyncInitMethodManager.submitTask(new AsyncBeanInitRunnable(invocation));
9179
return null;
9280
}
9381

9482
if (isAsyncCalling) {
9583
long startTime = System.currentTimeMillis();
9684
initCountDownLatch.await();
97-
LOGGER.info("{}({}) {} method wait {}ms.",
98-
targetObject.getClass().getName(), beanName, methodName,
99-
(System.currentTimeMillis() - startTime));
85+
LOGGER.info("{}({}) {} method wait {}ms.", targetObject.getClass().getName(), beanName,
86+
methodName, (System.currentTimeMillis() - startTime));
10087
}
10188
return invocation.getMethod().invoke(targetObject, invocation.getArguments());
10289
}
@@ -105,4 +92,28 @@ void asyncMethodFinish() {
10592
this.initCountDownLatch.countDown();
10693
this.isAsyncCalling = false;
10794
}
95+
96+
private class AsyncBeanInitRunnable implements Runnable {
97+
98+
private final MethodInvocation invocation;
99+
100+
public AsyncBeanInitRunnable(MethodInvocation invocation) {
101+
this.invocation = invocation;
102+
}
103+
104+
@Override
105+
public void run() {
106+
try {
107+
long startTime = System.currentTimeMillis();
108+
invocation.getMethod().invoke(targetObject, invocation.getArguments());
109+
LOGGER.info("{}({}) {} method execute {}dms.", targetObject.getClass().getName(),
110+
beanName, invocation.getMethod().getName(),
111+
(System.currentTimeMillis() - startTime));
112+
} catch (Throwable e) {
113+
throw new RuntimeException(e);
114+
} finally {
115+
AsyncInitializeBeanMethodInvoker.this.asyncMethodFinish();
116+
}
117+
}
118+
}
108119
}

0 commit comments

Comments
 (0)