Skip to content

Commit 04447c7

Browse files
authored
optimize: enhance close() logic of discovery module (#7375)
1 parent bd717a6 commit 04447c7

File tree

14 files changed

+301
-76
lines changed

14 files changed

+301
-76
lines changed

changes/en-us/2.x.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ Add changes here for all PR submitted to the 2.x branch.
4848
- [[#7360](https://github.com/apache/incubator-seata/pull/7360)] Update resource cleanup logic for channel disconnection
4949
- [[#7363](https://github.com/apache/incubator-seata/pull/7363)] Upgrade npmjs dependencies
5050
- [[#7372](https://github.com/apache/incubator-seata/pull/7372)] optimize license ignore
51+
- [[#7375](https://github.com/apache/incubator-seata/pull/7375)] optimize close() logic of discovery module
5152
- [[#7388](https://github.com/apache/incubator-seata/pull/7388)] optimize binary packaging directory structure
5253
- [[#7412](https://github.com/apache/incubator-seata/pull/7412)] Helm template adapted to the new version of seata
5354
- [[#7414](https://github.com/apache/incubator-seata/pull/7414)] Remove the unused defaultEventExecutorGroup from the NettyClientBootstrap
@@ -88,7 +89,6 @@ Add changes here for all PR submitted to the 2.x branch.
8889
- [[#7379](https://github.com/apache/incubator-seata/issues/7379)] add UT for TccAnnotationProcessor class
8990
- [[#7422](https://github.com/apache/incubator-seata/pull/7422)] add UT for seata-spring-boot-starter module
9091

91-
9292
### refactor:
9393

9494
- [[#7315](https://github.com/apache/incubator-seata/pull/7315)] Refactor log testing to use ListAppender for more accurate and efficient log capture

changes/zh-cn/2.x.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
- [[#7360](https://github.com/apache/incubator-seata/pull/7360)] 更新通道断开连接时的资源清理逻辑
4949
- [[#7363](https://github.com/apache/incubator-seata/pull/7363)] 升级 npmjs 依赖项
5050
- [[#7372](https://github.com/apache/incubator-seata/pull/7372)] 改进忽略许可证标头检查
51+
- [[#7375](https://github.com/apache/incubator-seata/pull/7375)] 优化 discovery 模块的 close 方法
5152
- [[#7388](https://github.com/apache/incubator-seata/pull/7388)] 优化二进制打包目录结构
5253
- [[#7412](https://github.com/apache/incubator-seata/pull/7412)] 适配新版本 Seata 的 Helm 模板
5354
- [[#7414](https://github.com/apache/incubator-seata/pull/7414)] 移除 NettyClientBootstrap 中 defaultEventExecutorGroup

discovery/seata-discovery-consul/src/main/java/org/apache/seata/discovery/registry/consul/ConsulRegistryServiceImpl.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,23 @@ void stop() {
364364

365365
@Override
366366
public void close() throws Exception {
367-
client = null;
368-
}
367+
notifiers.values().forEach(ConsulNotifier::stop);
368+
notifiers.clear();
369+
370+
// Shut down the ThreadPoolExecutor
371+
if (notifierExecutor != null && !notifierExecutor.isShutdown()) {
372+
notifierExecutor.shutdown();
373+
try {
374+
if (!notifierExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
375+
notifierExecutor.shutdownNow();
376+
}
377+
} catch (InterruptedException e) {
378+
notifierExecutor.shutdownNow();
379+
} finally {
380+
notifierExecutor = null;
381+
}
382+
}
369383

384+
RegistryHeartBeats.close(REGISTRY_TYPE);
385+
}
370386
}
Lines changed: 72 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -16,33 +16,40 @@
1616
*/
1717
package org.apache.seata.discovery.registry.consul;
1818

19+
import static org.junit.jupiter.api.Assertions.assertNull;
20+
import static org.junit.jupiter.api.Assertions.assertTrue;
21+
import static org.mockito.Mockito.any;
22+
import static org.mockito.Mockito.mock;
23+
import static org.mockito.Mockito.verify;
24+
import static org.mockito.Mockito.when;
25+
1926
import com.ecwid.consul.transport.RawResponse;
2027
import com.ecwid.consul.v1.ConsulClient;
2128
import com.ecwid.consul.v1.Response;
2229
import com.ecwid.consul.v1.health.model.HealthService;
23-
import org.apache.seata.config.Configuration;
24-
import org.apache.seata.config.ConfigurationFactory;
25-
import org.apache.seata.config.exception.ConfigNotFoundException;
26-
import org.junit.jupiter.api.Assertions;
27-
import org.junit.jupiter.api.BeforeEach;
28-
import org.junit.jupiter.api.Test;
29-
import org.mockito.MockedStatic;
30-
import org.mockito.Mockito;
3130

3231
import java.lang.reflect.Field;
3332
import java.net.InetSocketAddress;
3433
import java.util.ArrayList;
3534
import java.util.List;
3635
import java.util.concurrent.ConcurrentMap;
3736
import java.util.concurrent.ExecutorService;
37+
import java.util.concurrent.TimeUnit;
3838

39-
import static org.mockito.Mockito.any;
40-
import static org.mockito.Mockito.mock;
41-
import static org.mockito.Mockito.verify;
42-
import static org.mockito.Mockito.when;
43-
39+
import org.apache.seata.config.Configuration;
40+
import org.apache.seata.config.ConfigurationFactory;
41+
import org.apache.seata.config.exception.ConfigNotFoundException;
42+
import org.junit.jupiter.api.Assertions;
43+
import org.junit.jupiter.api.BeforeEach;
44+
import org.junit.jupiter.api.MethodOrderer;
45+
import org.junit.jupiter.api.Order;
46+
import org.junit.jupiter.api.Test;
47+
import org.junit.jupiter.api.TestMethodOrder;
48+
import org.mockito.MockedStatic;
49+
import org.mockito.Mockito;
4450

45-
public class ConsulRegistryServiceImplTest {
51+
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
52+
public class ConsulRegistryServiceImplMockTest {
4653

4754
final String TEST_CLUSTER_NAME = "testCluster";
4855

@@ -52,18 +59,22 @@ public class ConsulRegistryServiceImplTest {
5259

5360
@BeforeEach
5461
public void init() throws Exception {
62+
configuration = mock(Configuration.class);
5563
service = (ConsulRegistryServiceImpl) new ConsulRegistryProvider().provide();
5664
client = mock(ConsulClient.class);
57-
this.setClient(service, client);
5865

59-
configuration = mock(Configuration.class);
66+
Field clientField = ConsulRegistryServiceImpl.class.getDeclaredField("client");
67+
clientField.setAccessible(true);
68+
clientField.set(service, client);
6069
}
6170

71+
@Order(1)
6272
@Test
6373
public void testGetInstance() {
6474
Assertions.assertEquals(ConsulRegistryServiceImpl.getInstance(), service);
6575
}
6676

77+
@Order(2)
6778
@Test
6879
public void testRegister() throws Exception {
6980
InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 8080);
@@ -77,6 +88,7 @@ public void testRegister() throws Exception {
7788
verify(client).agentServiceDeregister(any(), any());
7889
}
7990

91+
@Order(3)
8092
@Test
8193
public void testSubscribeAndLookup() throws Exception {
8294
ConsulListener consulListener = mock(ConsulListener.class);
@@ -110,14 +122,54 @@ public void testSubscribeAndLookup() throws Exception {
110122
}
111123

112124
service.unsubscribe(TEST_CLUSTER_NAME, consulListener);
113-
Assertions.assertNull(getMap("notifiers").get(TEST_CLUSTER_NAME));
125+
assertNull(getMap("notifiers").get(TEST_CLUSTER_NAME));
114126
}
115127

128+
@Order(4)
129+
@Test
130+
public void testClose() throws Exception {
131+
ExecutorService executorService1 = mockExecutorService(false, new InterruptedException("Test interruption"));
132+
service.close();
133+
verifyCloseResults(executorService1, true);
116134

117-
private void setClient(ConsulRegistryServiceImpl service, ConsulClient client) throws Exception {
118-
Field clientField = ConsulRegistryServiceImpl.class.getDeclaredField("client");
135+
ExecutorService executorService = mockExecutorService(false, null);
136+
service.close();
137+
138+
verifyCloseResults(executorService, true);
139+
}
140+
141+
private ExecutorService mockExecutorService(boolean awaitTerminationResult, InterruptedException exception) throws Exception {
142+
ExecutorService executorService = mock(ExecutorService.class);
143+
when(executorService.isShutdown()).thenReturn(false);
144+
145+
if (exception != null) {
146+
when(executorService.awaitTermination(5, TimeUnit.SECONDS)).thenThrow(exception);
147+
} else {
148+
when(executorService.awaitTermination(5, TimeUnit.SECONDS)).thenReturn(awaitTerminationResult);
149+
}
150+
151+
setExecutorService(executorService);
152+
return executorService;
153+
}
154+
155+
/**
156+
* Verify the results of the closure method
157+
*/
158+
private void verifyCloseResults(ExecutorService executorService, boolean expectShutdownNow) throws Exception {
159+
verify(executorService).shutdown();
160+
verify(executorService).awaitTermination(5, TimeUnit.SECONDS);
161+
if (expectShutdownNow) {
162+
verify(executorService).shutdownNow();
163+
}
164+
165+
Field clientField = ConsulRegistryServiceImpl.class.getDeclaredField("notifiers");
119166
clientField.setAccessible(true);
120-
clientField.set(service, client);
167+
ConcurrentMap notifiers = (ConcurrentMap)clientField.get(service);
168+
assertTrue(notifiers.isEmpty());
169+
170+
Field executorServiceField = ConsulRegistryServiceImpl.class.getDeclaredField("notifierExecutor");
171+
executorServiceField.setAccessible(true);
172+
assertNull(executorServiceField.get(service));
121173
}
122174

123175
private void setExecutorService(ExecutorService executorService) throws Exception {

discovery/seata-discovery-core/src/main/java/org/apache/seata/discovery/registry/RegistryHeartBeats.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,10 @@ public static void addHeartBeat(String registryType, InetSocketAddress serverAdd
7575
}, period, period, TimeUnit.MILLISECONDS);
7676
}
7777

78-
public static void close() {
79-
HEARTBEAT_SCHEDULED.shutdown();
78+
public static void close(String registryType) {
79+
if (getHeartbeatEnabled(registryType)) {
80+
HEARTBEAT_SCHEDULED.shutdown();
81+
}
8082
}
8183

8284
private static long getHeartbeatPeriod(String registryType) {

discovery/seata-discovery-etcd3/src/main/java/org/apache/seata/discovery/registry/etcd3/EtcdRegistryServiceImpl.java

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -225,12 +225,34 @@ public void onCompleted() {}
225225

226226
@Override
227227
public void close() throws Exception {
228-
if (lifeKeeper != null) {
229-
lifeKeeper.stop();
230-
if (lifeKeeperFuture != null) {
231-
lifeKeeperFuture.get(3, TimeUnit.SECONDS);
228+
// Shut down the ThreadPoolExecutor
229+
if (executorService != null && !executorService.isShutdown()) {
230+
executorService.shutdown();
231+
232+
try {
233+
if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
234+
executorService.shutdownNow();
235+
}
236+
} catch (InterruptedException e) {
237+
LOGGER.warn("ExecutorService shutdown interrupted. Forcing shutdown.");
238+
executorService.shutdownNow();
239+
} finally {
240+
executorService = null;
241+
}
242+
}
243+
244+
// Close the Etcd client and release the underlying connection
245+
if (client != null) {
246+
try {
247+
client.close();
248+
} catch (Exception e) {
249+
LOGGER.warn("Failed to close Etcd client: {}", e.getMessage());
250+
} finally {
251+
client = null;
232252
}
233253
}
254+
255+
RegistryHeartBeats.close(REGISTRY_TYPE);
234256
}
235257

236258
/**
Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,8 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
package org.apache.seata.discovery.registry.etcd;
17+
package org.apache.seata.discovery.registry.etcd3;
1818

19-
import org.apache.seata.discovery.registry.etcd3.EtcdRegistryProvider;
20-
import org.apache.seata.discovery.registry.etcd3.EtcdRegistryServiceImpl;
2119
import org.junit.jupiter.api.Test;
2220

2321
import static org.assertj.core.api.Assertions.assertThat;

0 commit comments

Comments
 (0)