Skip to content

Commit a9e19b7

Browse files
authored
feature: Reuse connection to merge branch transactions (#7509)
1 parent f8e0bd5 commit a9e19b7

File tree

17 files changed

+676
-13
lines changed

17 files changed

+676
-13
lines changed

changes/en-us/2.x.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ Add changes here for all PR submitted to the 2.x branch.
2121
### feature:
2222

2323
- [[#7485](https://github.com/apache/incubator-seata/pull/7485)] Add http request filter for seata-server
24+
- [[#7509](https://github.com/apache/incubator-seata/pull/7509)] Reuse connection to merge branch transactions
2425
- [[#7492](https://github.com/apache/incubator-seata/pull/7492)] upgrade HTTP client in common module to support HTTP/2
2526

2627

changes/zh-cn/2.x.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
### feature:
2222

2323
- [[#7485](https://github.com/apache/incubator-seata/pull/7485)] 给seata-server端的http请求添加过滤器
24+
- [[#7509](https://github.com/apache/incubator-seata/pull/7509)] 复用连接合并分支事务
2425
- [[#7492](https://github.com/apache/incubator-seata/pull/7492)] 升级 common 模块中的 HTTP 客户端以支持 HTTP/2
2526

2627

core/src/main/java/org/apache/seata/core/context/RootContext.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ private RootContext() {}
9191

9292
private static BranchType DEFAULT_BRANCH_TYPE;
9393

94+
public static final String KEY_COMBINE_TRANSACTION_FLAG = "TX_COMBINE";
95+
9496
public static void setDefaultBranchType(BranchType defaultBranchType) {
9597
if (defaultBranchType != AT && defaultBranchType != XA) {
9698
throw new IllegalArgumentException("The default branch type must be " + AT + " or " + XA + "."
@@ -207,6 +209,10 @@ public static boolean inSagaBranch() {
207209
return BranchType.SAGA == getBranchType();
208210
}
209211

212+
public static boolean inXABranch() {
213+
return BranchType.XA == getBranchType();
214+
}
215+
210216
/**
211217
* get the branch type
212218
*
@@ -282,4 +288,16 @@ public static void assertNotInGlobalTransaction() {
282288
public static Map<String, Object> entries() {
283289
return CONTEXT_HOLDER.entries();
284290
}
291+
292+
public static boolean inCombineTransaction() {
293+
return CONTEXT_HOLDER.get(KEY_COMBINE_TRANSACTION_FLAG) != null;
294+
}
295+
296+
public static void bindCombineTransaction() {
297+
CONTEXT_HOLDER.put(KEY_COMBINE_TRANSACTION_FLAG, true);
298+
}
299+
300+
public static void unbindCombineTransaction() {
301+
CONTEXT_HOLDER.remove(KEY_COMBINE_TRANSACTION_FLAG);
302+
}
285303
}

integration-tx-api/pom.xml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,6 @@
6161
<groupId>net.bytebuddy</groupId>
6262
<artifactId>byte-buddy</artifactId>
6363
</dependency>
64-
6564
</dependencies>
6665

6766
</project>

integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/interceptor/InvocationHandlerType.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,5 +34,10 @@ public enum InvocationHandlerType {
3434
/**
3535
* SagaAnnotation InvocationHandler
3636
*/
37-
SagaAnnotation
37+
SagaAnnotation,
38+
39+
/**
40+
* CombineTransactional InvocationHandler
41+
*/
42+
CombineTransactional
3843
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.seata.integration.tx.api.interceptor.handler;
18+
19+
import org.apache.seata.core.context.RootContext;
20+
import org.apache.seata.integration.tx.api.interceptor.InvocationHandlerType;
21+
import org.apache.seata.integration.tx.api.interceptor.InvocationWrapper;
22+
import org.apache.seata.integration.tx.api.interceptor.SeataInterceptorPosition;
23+
import org.apache.seata.integration.tx.api.util.ClassUtils;
24+
import org.apache.seata.rm.datasource.combine.CombineConnectionHolder;
25+
import org.apache.seata.rm.datasource.xa.ConnectionProxyXA;
26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
28+
29+
import java.lang.reflect.Method;
30+
import java.util.Set;
31+
import java.util.concurrent.ScheduledThreadPoolExecutor;
32+
33+
/**
34+
* The type Combine transactional interceptor handler.
35+
*
36+
*/
37+
public class CombineTransactionalInterceptorHandler extends AbstractProxyInvocationHandler {
38+
39+
private static final Logger LOGGER = LoggerFactory.getLogger(CombineTransactionalInterceptorHandler.class);
40+
41+
private Set<String> methodsToProxy;
42+
43+
private static volatile ScheduledThreadPoolExecutor executor;
44+
45+
public CombineTransactionalInterceptorHandler(Set<String> methodsToProxy) {
46+
this.methodsToProxy = methodsToProxy;
47+
}
48+
49+
@Override
50+
protected Object doInvoke(InvocationWrapper invocation) throws Throwable {
51+
Class<?> targetClass = invocation.getTarget().getClass();
52+
Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
53+
if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {
54+
return handleCombineTransactional(invocation);
55+
}
56+
return invocation.proceed();
57+
}
58+
59+
private Object handleCombineTransactional(final InvocationWrapper methodInvocation) throws Throwable {
60+
if (!RootContext.inGlobalTransaction()) {
61+
// not in transaction, or this interceptor is disabled
62+
return methodInvocation.proceed();
63+
}
64+
65+
RootContext.bindCombineTransaction();
66+
67+
try {
68+
Object result = methodInvocation.proceed();
69+
70+
for (ConnectionProxyXA conn : CombineConnectionHolder.getDsConn()) {
71+
conn.setCombine(false);
72+
conn.commit();
73+
}
74+
return result;
75+
} catch (Exception e) {
76+
LOGGER.error(
77+
String.format(
78+
"@CombineTransactional failed to handle,xid: %s occur exp msg: %s",
79+
RootContext.getXID(), e.getMessage()),
80+
e);
81+
// doRollback
82+
for (ConnectionProxyXA conn : CombineConnectionHolder.getDsConn()) {
83+
conn.setCombine(false);
84+
conn.rollback();
85+
}
86+
throw e;
87+
} finally {
88+
for (ConnectionProxyXA conn : CombineConnectionHolder.getDsConn()) {
89+
try {
90+
// Reset autocommit (if not autocommitting)
91+
if (!conn.getAutoCommit()) {
92+
conn.setAutoCommit(true);
93+
}
94+
} catch (Throwable t) {
95+
// Record the exception of resetting the auto-commit, but do not interrupt and continue to try to
96+
// close
97+
LOGGER.error("Failed to reset autoCommit to true for connection: {}", conn, t);
98+
}
99+
try {
100+
if (conn.isClosed()) {
101+
LOGGER.warn("Connection is closed: {}", conn);
102+
}
103+
conn.close();
104+
} catch (Throwable t) {
105+
// Record the exception of closing the connection, but do not interrupt the loop and continue to
106+
// process the next connection
107+
LOGGER.error("Failed to close connection: {}", conn, t);
108+
}
109+
}
110+
// Clean up local cache connections
111+
CombineConnectionHolder.clear();
112+
RootContext.unbindCombineTransaction();
113+
}
114+
}
115+
116+
@Override
117+
public Set<String> getMethodsToProxy() {
118+
return methodsToProxy;
119+
}
120+
121+
@Override
122+
public SeataInterceptorPosition getPosition() {
123+
return SeataInterceptorPosition.AfterTransaction;
124+
}
125+
126+
@Override
127+
public String type() {
128+
return InvocationHandlerType.CombineTransactional.name();
129+
}
130+
131+
@Override
132+
public int order() {
133+
return 1;
134+
}
135+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.seata.integration.tx.api.interceptor.parser;
18+
19+
import org.apache.seata.common.util.CollectionUtils;
20+
import org.apache.seata.common.util.ReflectionUtil;
21+
import org.apache.seata.integration.tx.api.interceptor.handler.CombineTransactionalInterceptorHandler;
22+
import org.apache.seata.integration.tx.api.interceptor.handler.ProxyInvocationHandler;
23+
import org.apache.seata.spring.annotation.CombineTransactional;
24+
25+
import java.lang.reflect.Method;
26+
import java.util.HashSet;
27+
import java.util.Set;
28+
29+
public class CombineTransactionalInterceptorParser implements InterfaceParser {
30+
31+
protected final Set<String> methodsToProxy = new HashSet<>();
32+
33+
/**
34+
* @param target
35+
* @return
36+
* @throws Exception
37+
* @see CombineTransactional // Combine annotation
38+
*/
39+
@Override
40+
public ProxyInvocationHandler parserInterfaceToProxy(Object target, String objectName) throws Exception {
41+
Class<?> serviceInterface = DefaultTargetClassParser.get().findTargetClass(target);
42+
Class<?>[] interfacesIfJdk = DefaultTargetClassParser.get().findInterfaces(target);
43+
44+
if (existsAnnotation(serviceInterface) || existsAnnotation(interfacesIfJdk)) {
45+
return createProxyInvocationHandler();
46+
}
47+
48+
return null;
49+
}
50+
51+
protected ProxyInvocationHandler createProxyInvocationHandler() {
52+
return new CombineTransactionalInterceptorHandler(methodsToProxy);
53+
}
54+
55+
@Override
56+
public IfNeedEnhanceBean parseIfNeedEnhancement(Class<?> beanClass) {
57+
Set<Class<?>> interfaceClasses = ReflectionUtil.getInterfaces(beanClass);
58+
Class<?>[] interfaceClasseArray = interfaceClasses.toArray(new Class<?>[0]);
59+
60+
IfNeedEnhanceBean ifNeedEnhanceBean = new IfNeedEnhanceBean();
61+
if (existsAnnotation(beanClass) || existsAnnotation(interfaceClasseArray)) {
62+
ifNeedEnhanceBean.setIfNeed(true);
63+
ifNeedEnhanceBean.setNeedEnhanceEnum(NeedEnhanceEnum.SERVICE_BEAN);
64+
}
65+
return ifNeedEnhanceBean;
66+
}
67+
68+
protected boolean existsAnnotation(Class<?>... classes) {
69+
boolean result = false;
70+
if (CollectionUtils.isNotEmpty(classes)) {
71+
for (Class<?> clazz : classes) {
72+
if (clazz == null) {
73+
continue;
74+
}
75+
CombineTransactional trxAnno = clazz.getAnnotation(CombineTransactional.class);
76+
if (trxAnno != null) {
77+
return true;
78+
}
79+
Method[] methods = clazz.getMethods();
80+
for (Method method : methods) {
81+
trxAnno = method.getAnnotation(CombineTransactional.class);
82+
if (trxAnno != null) {
83+
methodsToProxy.add(method.getName());
84+
result = true;
85+
}
86+
}
87+
}
88+
}
89+
return result;
90+
}
91+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.seata.spring.annotation;
18+
19+
import java.lang.annotation.Documented;
20+
import java.lang.annotation.ElementType;
21+
import java.lang.annotation.Retention;
22+
import java.lang.annotation.RetentionPolicy;
23+
import java.lang.annotation.Target;
24+
25+
/**
26+
* The interface Combine transactional.
27+
* This annotation is only valid in XA mode.
28+
*/
29+
@Target({ElementType.TYPE, ElementType.METHOD})
30+
@Retention(RetentionPolicy.RUNTIME)
31+
@Documented
32+
public @interface CombineTransactional {}

integration-tx-api/src/main/resources/META-INF/services/org.apache.seata.integration.tx.api.interceptor.parser.InterfaceParser

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,5 @@
1414
# See the License for the specific language governing permissions and
1515
# limitations under the License.
1616
#
17-
org.apache.seata.integration.tx.api.interceptor.parser.GlobalTransactionalInterceptorParser
17+
org.apache.seata.integration.tx.api.interceptor.parser.GlobalTransactionalInterceptorParser
18+
org.apache.seata.integration.tx.api.interceptor.parser.CombineTransactionalInterceptorParser
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.seata.integration.tx.api.interceptor.parser;
18+
19+
import org.apache.seata.spring.annotation.CombineTransactional;
20+
import org.apache.seata.spring.annotation.GlobalTransactional;
21+
import org.slf4j.Logger;
22+
import org.slf4j.LoggerFactory;
23+
24+
/**
25+
* The type Business.
26+
*/
27+
@GlobalTransactional(timeoutMills = 300000, name = "busi-doBiz")
28+
@CombineTransactional
29+
public class BusinessCombineImpl implements Business {
30+
private static final Logger LOGGER = LoggerFactory.getLogger(BusinessCombineImpl.class);
31+
32+
@Override
33+
public String doBiz(String msg) {
34+
LOGGER.info("Business doBiz");
35+
return "hello " + msg;
36+
}
37+
}

0 commit comments

Comments
 (0)