-
Notifications
You must be signed in to change notification settings - Fork 8.9k
feature: Reuse connection to merge branch transactions #7509
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 6 commits
f607ee1
4a16d62
78e5a17
2c2f612
69d4a70
485b0b1
ac0fb1a
60846e0
1b9dd78
6c97992
db46bcd
e41c083
a288913
341328e
0ba84a9
941776d
7880439
a724c22
e57effa
0b7b525
e474563
0883c9a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,96 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.seata.integration.tx.api.combine; | ||
|
|
||
| import org.apache.seata.core.context.RootContext; | ||
| import org.apache.seata.rm.datasource.combine.CombineConnectionHolder; | ||
| import org.apache.seata.rm.datasource.combine.CombineContext; | ||
| import org.apache.seata.rm.datasource.xa.ConnectionProxyXA; | ||
| import org.aspectj.lang.ProceedingJoinPoint; | ||
| import org.aspectj.lang.annotation.Around; | ||
| import org.aspectj.lang.annotation.Aspect; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
| import org.springframework.stereotype.Component; | ||
|
|
||
| @Aspect | ||
| @Component | ||
| public class CombineAspect { | ||
| private static final Logger LOGGER = LoggerFactory.getLogger(CombineAspect.class); | ||
|
Check warning on line 33 in integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/combine/CombineAspect.java
|
||
|
|
||
| @Around("@annotation(org.apache.seata.spring.annotation.CombineTransactional)") | ||
| public Object handleCombine(ProceedingJoinPoint joinPoint) throws Throwable { | ||
| if (!RootContext.inGlobalTransaction() || !RootContext.inXABranch()) { | ||
| // not in transaction, or this interceptor is disabled | ||
| return joinPoint.proceed(); | ||
|
Check warning on line 39 in integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/combine/CombineAspect.java
|
||
| } | ||
|
|
||
| if (!CombineContext.set()) { | ||
| // The same global transaction, the aspect does not need to enter | ||
| return joinPoint.proceed(); | ||
|
Check warning on line 44 in integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/combine/CombineAspect.java
|
||
| } | ||
|
|
||
| try { | ||
| // First cut entry | ||
| Object result = joinPoint.proceed(); | ||
|
Check warning on line 49 in integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/combine/CombineAspect.java
|
||
|
|
||
| // doCleanupAfterCompletion marks the end of the transaction, resets and closes the connection | ||
| CombineContext.clear(); | ||
|
Check warning on line 52 in integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/combine/CombineAspect.java
|
||
| // doCommit | ||
| for (ConnectionProxyXA conn : CombineConnectionHolder.getDsConn()) { | ||
| conn.commit(); | ||
| } | ||
| return result; | ||
| } catch (Exception e) { | ||
| LOGGER.error( | ||
| String.format("Failed to handle,xid: %s occur exp msg: %s", RootContext.getXID(), e.getMessage()), | ||
|
Check warning on line 60 in integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/combine/CombineAspect.java
|
||
| e); | ||
| CombineContext.clear(); | ||
|
Check warning on line 62 in integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/combine/CombineAspect.java
|
||
| // doRollback | ||
| for (ConnectionProxyXA conn : CombineConnectionHolder.getDsConn()) { | ||
| conn.rollback(); | ||
| } | ||
| throw e; | ||
|
Check warning on line 67 in integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/combine/CombineAspect.java
|
||
| } finally { | ||
| CombineContext.clear(); | ||
|
Check warning on line 69 in integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/combine/CombineAspect.java
|
||
| for (ConnectionProxyXA conn : CombineConnectionHolder.getDsConn()) { | ||
| try { | ||
| // Reset autocommit (if not autocommitting) | ||
| if (!conn.getAutoCommit()) { | ||
| conn.setAutoCommit(true); | ||
|
Check warning on line 74 in integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/combine/CombineAspect.java
|
||
| } | ||
| } catch (Throwable t) { | ||
|
Check warning on line 76 in integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/combine/CombineAspect.java
|
||
| // Record the exception of resetting the auto-commit, but do not interrupt and continue to try to | ||
| // close | ||
| LOGGER.error("Failed to reset autoCommit to true for connection: {}", conn, t); | ||
| } | ||
|
Check warning on line 80 in integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/combine/CombineAspect.java
|
||
| try { | ||
| if (conn.isClosed()) { | ||
| LOGGER.error("Connection is closed: {}", conn); | ||
|
Check warning on line 83 in integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/combine/CombineAspect.java
|
||
| } | ||
| conn.close(); | ||
| } catch (Throwable t) { | ||
|
Check warning on line 86 in integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/combine/CombineAspect.java
|
||
| // Record the exception of closing the connection, but do not interrupt the loop and continue to | ||
| // process the next connection | ||
| LOGGER.error("Failed to close connection: {}", conn, t); | ||
| } | ||
| } | ||
|
Check warning on line 91 in integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/combine/CombineAspect.java
|
||
| // Clean up local cache connections | ||
| CombineConnectionHolder.clear(); | ||
|
Check warning on line 93 in integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/combine/CombineAspect.java
|
||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,26 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.seata.integration.tx.api.combine; | ||
|
|
||
| import org.springframework.context.annotation.ComponentScan; | ||
| import org.springframework.context.annotation.Configuration; | ||
| import org.springframework.context.annotation.EnableAspectJAutoProxy; | ||
|
|
||
| @ComponentScan(basePackages = "org.apache.seata.integration.tx.api.combine") | ||
| @Configuration | ||
| @EnableAspectJAutoProxy(proxyTargetClass = true) | ||
| public class CombineAutoConfiguration {} | ||
|
Check warning on line 26 in integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/combine/CombineAutoConfiguration.java
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,24 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.seata.spring.annotation; | ||
|
|
||
| import java.lang.annotation.*; | ||
funky-eyes marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| @Target({ElementType.TYPE, ElementType.METHOD}) | ||
| @Retention(RetentionPolicy.RUNTIME) | ||
| @Documented | ||
| public @interface CombineTransactional {} | ||
funky-eyes marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,19 @@ | ||
| # | ||
| # Licensed to the Apache Software Foundation (ASF) under one or more | ||
| # contributor license agreements. See the NOTICE file distributed with | ||
| # this work for additional information regarding copyright ownership. | ||
| # The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| # (the "License"); you may not use this file except in compliance with | ||
| # the License. You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
| # | ||
| # Auto Configure | ||
| org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ | ||
| org.apache.seata.integration.tx.api.combine.CombineAutoConfiguration |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,60 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.seata.rm.datasource.combine; | ||
|
|
||
| import org.apache.seata.core.context.RootContext; | ||
| import org.apache.seata.rm.datasource.xa.ConnectionProxyXA; | ||
|
|
||
| import javax.sql.DataSource; | ||
| import java.sql.SQLException; | ||
| import java.util.Collection; | ||
| import java.util.Collections; | ||
| import java.util.Map; | ||
| import java.util.concurrent.ConcurrentHashMap; | ||
|
|
||
| public class CombineConnectionHolder { | ||
| private static final ThreadLocal<Map<String, Map<Object, ConnectionProxyXA>>> CONNECTION_HOLDER = | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why use ConcurrentHashMap? Will there be concurrent situations?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think there may be asynchronous or parallel branches. |
||
| ThreadLocal.withInitial(ConcurrentHashMap::new); | ||
|
Check warning on line 31 in rm-datasource/src/main/java/org/apache/seata/rm/datasource/combine/CombineConnectionHolder.java
|
||
|
|
||
| public static ConnectionProxyXA get(DataSource dataSource) { | ||
| Map<Object, ConnectionProxyXA> connMap = CONNECTION_HOLDER.get().get(RootContext.getXID()); | ||
|
Check warning on line 34 in rm-datasource/src/main/java/org/apache/seata/rm/datasource/combine/CombineConnectionHolder.java
|
||
| if (connMap != null) { | ||
| return connMap.get(dataSource); | ||
|
Check warning on line 36 in rm-datasource/src/main/java/org/apache/seata/rm/datasource/combine/CombineConnectionHolder.java
|
||
| } | ||
| return null; | ||
|
Check warning on line 38 in rm-datasource/src/main/java/org/apache/seata/rm/datasource/combine/CombineConnectionHolder.java
|
||
| } | ||
|
|
||
| public static Collection<ConnectionProxyXA> getDsConn() { | ||
| Map<Object, ConnectionProxyXA> connectionMap = CONNECTION_HOLDER.get().get(RootContext.getXID()); | ||
|
Check warning on line 42 in rm-datasource/src/main/java/org/apache/seata/rm/datasource/combine/CombineConnectionHolder.java
|
||
| return connectionMap != null ? connectionMap.values() : Collections.emptyList(); | ||
| } | ||
|
|
||
| public static void putConnection(DataSource dataSource, ConnectionProxyXA connection) throws SQLException { | ||
| Map<String, Map<Object, ConnectionProxyXA>> concurrentHashMap = CONNECTION_HOLDER.get(); | ||
| String xid = RootContext.getXID(); | ||
| Map<Object, ConnectionProxyXA> connectionProxyMap = | ||
| concurrentHashMap.computeIfAbsent(xid, k -> new ConcurrentHashMap<>()); | ||
|
Check warning on line 50 in rm-datasource/src/main/java/org/apache/seata/rm/datasource/combine/CombineConnectionHolder.java
|
||
funky-eyes marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| if (connectionProxyMap.putIfAbsent(dataSource, connection) == null) { | ||
| connection.setAutoCommit(false); | ||
|
Check warning on line 53 in rm-datasource/src/main/java/org/apache/seata/rm/datasource/combine/CombineConnectionHolder.java
|
||
| } | ||
| } | ||
|
Check warning on line 55 in rm-datasource/src/main/java/org/apache/seata/rm/datasource/combine/CombineConnectionHolder.java
|
||
|
|
||
| public static void clear() { | ||
| CONNECTION_HOLDER.get().remove(RootContext.getXID()); | ||
| } | ||
|
Check warning on line 59 in rm-datasource/src/main/java/org/apache/seata/rm/datasource/combine/CombineConnectionHolder.java
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,56 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.seata.rm.datasource.combine; | ||
|
|
||
| import org.apache.seata.core.context.RootContext; | ||
|
|
||
| import java.util.Map; | ||
| import java.util.concurrent.ConcurrentHashMap; | ||
|
|
||
| public class CombineContext { | ||
|
Check warning on line 24 in rm-datasource/src/main/java/org/apache/seata/rm/datasource/combine/CombineContext.java
|
||
funky-eyes marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| private static final ThreadLocal<Map<String, Boolean>> COMBINE_ASPECT = | ||
| ThreadLocal.withInitial(ConcurrentHashMap::new); | ||
| ; | ||
|
|
||
| /** | ||
| * @return | ||
| * false: The specified key already exists (repeatedly enter the aspect) | ||
| * true: The specified key does not exist (first time entering the aspect) | ||
| */ | ||
| public static boolean set() { | ||
| String xid = RootContext.getXID(); | ||
|
Check warning on line 35 in rm-datasource/src/main/java/org/apache/seata/rm/datasource/combine/CombineContext.java
|
||
| if (xid != null) { | ||
| return !Boolean.TRUE.equals(COMBINE_ASPECT.get().putIfAbsent(xid, Boolean.TRUE)); | ||
| } | ||
| return false; | ||
|
Check warning on line 39 in rm-datasource/src/main/java/org/apache/seata/rm/datasource/combine/CombineContext.java
|
||
| } | ||
|
|
||
| public static boolean get() { | ||
| String xid = RootContext.getXID(); | ||
| if (xid == null) { | ||
| return false; | ||
| } | ||
| return Boolean.TRUE.equals(COMBINE_ASPECT.get().get(xid)); | ||
| } | ||
|
|
||
| public static void clear() { | ||
| String xid = RootContext.getXID(); | ||
|
Check warning on line 51 in rm-datasource/src/main/java/org/apache/seata/rm/datasource/combine/CombineContext.java
|
||
| if (xid != null) { | ||
| COMBINE_ASPECT.get().remove(xid); | ||
|
Check warning on line 53 in rm-datasource/src/main/java/org/apache/seata/rm/datasource/combine/CombineContext.java
|
||
| } | ||
| } | ||
|
Check warning on line 55 in rm-datasource/src/main/java/org/apache/seata/rm/datasource/combine/CombineContext.java
|
||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should not rely on Spring, but instead refer to the existing proxy approach and use byte-buddy for proxying.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done