Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/en-us/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ Add changes here for all PR submitted to the 2.x branch.
### feature:

- [[#7485](https://github.com/apache/incubator-seata/pull/7485)] Add http request filter for seata-server
- [[#7509](https://github.com/apache/incubator-seata/pull/7509)] Reuse connection to merge branch transactions
- [[#7492](https://github.com/apache/incubator-seata/pull/7492)] upgrade HTTP client in common module to support HTTP/2


Expand Down
1 change: 1 addition & 0 deletions changes/zh-cn/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
### feature:

- [[#7485](https://github.com/apache/incubator-seata/pull/7485)] 给seata-server端的http请求添加过滤器
- [[#7509](https://github.com/apache/incubator-seata/pull/7509)] 复用连接合并分支事务
- [[#7492](https://github.com/apache/incubator-seata/pull/7492)] 升级 common 模块中的 HTTP 客户端以支持 HTTP/2


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,10 @@ public static boolean inSagaBranch() {
return BranchType.SAGA == getBranchType();
}

public static boolean inXABranch() {
return BranchType.XA == getBranchType();
}

/**
* get the branch type
*
Expand Down
1 change: 0 additions & 1 deletion integration-tx-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
<groupId>net.bytebuddy</groupId>
<artifactId>byte-buddy</artifactId>
</dependency>

</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,10 @@ public enum InvocationHandlerType {
/**
* SagaAnnotation InvocationHandler
*/
SagaAnnotation
SagaAnnotation,

/**
* CombineTransactional InvocationHandler
*/
CombineTransactional
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seata.integration.tx.api.interceptor.handler;

import org.apache.seata.config.CachedConfigurationChangeListener;
import org.apache.seata.config.ConfigurationChangeEvent;
import org.apache.seata.core.context.RootContext;
import org.apache.seata.core.model.BranchType;
import org.apache.seata.integration.tx.api.interceptor.InvocationHandlerType;
import org.apache.seata.integration.tx.api.interceptor.InvocationWrapper;
import org.apache.seata.integration.tx.api.interceptor.SeataInterceptorPosition;
import org.apache.seata.integration.tx.api.util.ClassUtils;
import org.apache.seata.rm.datasource.combine.CombineConnectionHolder;
import org.apache.seata.rm.datasource.combine.CombineContext;
import org.apache.seata.rm.datasource.xa.ConnectionProxyXA;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.reflect.Method;
import java.util.Set;
import java.util.concurrent.ScheduledThreadPoolExecutor;

/**
* The type Combine transactional interceptor handler.
*
*/
public class CombineTransactionalInterceptorHandler extends AbstractProxyInvocationHandler
implements CachedConfigurationChangeListener {

private static final Logger LOGGER = LoggerFactory.getLogger(CombineTransactionalInterceptorHandler.class);

private Set<String> methodsToProxy;

private static volatile ScheduledThreadPoolExecutor executor;

public CombineTransactionalInterceptorHandler(Set<String> methodsToProxy) {
this.methodsToProxy = methodsToProxy;
}

@Override
protected Object doInvoke(InvocationWrapper invocation) throws Throwable {
Class<?> targetClass = invocation.getTarget().getClass();
Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {
return handleCombineTransactional(invocation);
}
return invocation.proceed();
}

private Object handleCombineTransactional(final InvocationWrapper methodInvocation) throws Throwable {
if (!RootContext.inGlobalTransaction()) {
// not in transaction, or this interceptor is disabled
return methodInvocation.proceed();
}
RootContext.bindBranchType(BranchType.XA);
if (!CombineContext.set()) {
// The same global transaction, the aspect does not need to enter
return methodInvocation.proceed();
}

try {
// First cut entry
Object result = methodInvocation.proceed();

// doCleanupAfterCompletion marks the end of the transaction, resets and closes the connection
CombineContext.clear();
// doCommit
for (ConnectionProxyXA conn : CombineConnectionHolder.getDsConn()) {
conn.commit();
}
return result;
} catch (Exception e) {
LOGGER.error(
String.format(
"@CombineTransactional failed to handle,xid: %s occur exp msg: %s",
RootContext.getXID(), e.getMessage()),
e);
CombineContext.clear();
// doRollback
for (ConnectionProxyXA conn : CombineConnectionHolder.getDsConn()) {
conn.rollback();
}
throw e;
} finally {
CombineContext.clear();
for (ConnectionProxyXA conn : CombineConnectionHolder.getDsConn()) {
try {
// Reset autocommit (if not autocommitting)
if (!conn.getAutoCommit()) {
conn.setAutoCommit(true);
}
} catch (Throwable t) {
// Record the exception of resetting the auto-commit, but do not interrupt and continue to try to
// close
LOGGER.error("Failed to reset autoCommit to true for connection: {}", conn, t);
}
try {
if (conn.isClosed()) {
LOGGER.warn("Connection is closed: {}", conn);
}
conn.close();
} catch (Throwable t) {
// Record the exception of closing the connection, but do not interrupt the loop and continue to
// process the next connection
LOGGER.error("Failed to close connection: {}", conn, t);
}
}
// Clean up local cache connections
CombineConnectionHolder.clear();
RootContext.unbindBranchType();
}
}

@Override
public void onChangeEvent(ConfigurationChangeEvent event) {}

@Override
public Set<String> getMethodsToProxy() {
return methodsToProxy;
}

@Override
public SeataInterceptorPosition getPosition() {
return SeataInterceptorPosition.AfterTransaction;
}

@Override
public String type() {
return InvocationHandlerType.CombineTransactional.name();
}

@Override
public int order() {
return 1;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seata.integration.tx.api.interceptor.parser;

import org.apache.seata.common.ConfigurationKeys;
import org.apache.seata.common.util.CollectionUtils;
import org.apache.seata.common.util.ReflectionUtil;
import org.apache.seata.config.CachedConfigurationChangeListener;
import org.apache.seata.config.ConfigurationFactory;
import org.apache.seata.integration.tx.api.interceptor.handler.CombineTransactionalInterceptorHandler;
import org.apache.seata.integration.tx.api.interceptor.handler.ProxyInvocationHandler;
import org.apache.seata.spring.annotation.CombineTransactional;

import java.lang.reflect.Method;
import java.util.HashSet;
import java.util.Set;

public class CombineTransactionalInterceptorParser implements InterfaceParser {

protected final Set<String> methodsToProxy = new HashSet<>();

/**
* @param target
* @return
* @throws Exception
* @see CombineTransactional // Combine annotation
*/
@Override
public ProxyInvocationHandler parserInterfaceToProxy(Object target, String objectName) throws Exception {
Class<?> serviceInterface = DefaultTargetClassParser.get().findTargetClass(target);
Class<?>[] interfacesIfJdk = DefaultTargetClassParser.get().findInterfaces(target);

if (existsAnnotation(serviceInterface) || existsAnnotation(interfacesIfJdk)) {
ProxyInvocationHandler proxyInvocationHandler = createProxyInvocationHandler();
ConfigurationFactory.getInstance()
.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (CachedConfigurationChangeListener)
proxyInvocationHandler);
return proxyInvocationHandler;
}

return null;
}

protected ProxyInvocationHandler createProxyInvocationHandler() {
return new CombineTransactionalInterceptorHandler(methodsToProxy);
}

@Override
public IfNeedEnhanceBean parseIfNeedEnhancement(Class<?> beanClass) {
Set<Class<?>> interfaceClasses = ReflectionUtil.getInterfaces(beanClass);
Class<?>[] interfaceClasseArray = interfaceClasses.toArray(new Class<?>[0]);

IfNeedEnhanceBean ifNeedEnhanceBean = new IfNeedEnhanceBean();
if (existsAnnotation(beanClass) || existsAnnotation(interfaceClasseArray)) {
ifNeedEnhanceBean.setIfNeed(true);
ifNeedEnhanceBean.setNeedEnhanceEnum(NeedEnhanceEnum.SERVICE_BEAN);
}
return ifNeedEnhanceBean;
}

protected boolean existsAnnotation(Class<?>... classes) {
boolean result = false;
if (CollectionUtils.isNotEmpty(classes)) {
for (Class<?> clazz : classes) {
if (clazz == null) {
continue;
}
CombineTransactional trxAnno = clazz.getAnnotation(CombineTransactional.class);
if (trxAnno != null) {
return true;
}
Method[] methods = clazz.getMethods();
for (Method method : methods) {
trxAnno = method.getAnnotation(CombineTransactional.class);
if (trxAnno != null) {
methodsToProxy.add(method.getName());
result = true;
}
}
}
}
return result;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seata.spring.annotation;

import java.lang.annotation.*;

@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface CombineTransactional {}
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
org.apache.seata.integration.tx.api.interceptor.parser.GlobalTransactionalInterceptorParser
org.apache.seata.integration.tx.api.interceptor.parser.GlobalTransactionalInterceptorParser
org.apache.seata.integration.tx.api.interceptor.parser.CombineTransactionalInterceptorParser
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seata.integration.tx.api.interceptor.parser;

import org.apache.seata.spring.annotation.CombineTransactional;
import org.apache.seata.spring.annotation.GlobalTransactional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* The type Business.
*/
@GlobalTransactional(timeoutMills = 300000, name = "busi-doBiz")
@CombineTransactional
public class BusinessCombineImpl implements Business {
private static final Logger LOGGER = LoggerFactory.getLogger(BusinessCombineImpl.class);

@Override
public String doBiz(String msg) {
LOGGER.info("Business doBiz");
return "hello " + msg;
}
}
Loading
Loading