Skip to content

Commit 28b3413

Browse files
authored
bugfix : check if table meta cache should be refreshed in AT mode (apache#4734)
1 parent 00ca98c commit 28b3413

10 files changed

Lines changed: 225 additions & 43 deletions

File tree

changes/en-us/develop.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ Add changes here for all PR submitted to the develop branch.
2020
- [[#5299](https://github.com/seata/seata/pull/5299)] fix GlobalSession deletion when retry rollback or retry commit timeout
2121
- [[#5307](https://github.com/seata/seata/pull/5307)] fix that keywords don't add escaped characters
2222
- [[#5311](https://github.com/seata/seata/pull/5311)] remove RollbackRetryTimeout sessions during in file storage recover
23+
- [[#4734](https://github.com/seata/seata/pull/4734)] check if table meta cache should be refreshed in AT mode
2324
- [[#5316](https://github.com/seata/seata/pull/5316)] fix G1 jvm parameter in jdk8
2425
- [[#5321](https://github.com/seata/seata/pull/5321)] fix When the rollback logic on the TC side returns RollbackFailed, the custom FailureHandler is not executed
2526
- [[#5332](https://github.com/seata/seata/pull/5332)] fix bugs found in unit tests

changes/zh-cn/develop.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
- [[#5299](https://github.com/seata/seata/pull/5299)] 修复TC端重试回滚或重试提交超时GlobalSession的删除问题
2121
- [[#5307](https://github.com/seata/seata/pull/5307)] 修复生成update前后镜像sql不对关键字转义的bug
2222
- [[#5311](https://github.com/seata/seata/pull/5311)] 移除基于文件存储恢复时的RollbackRetryTimeout事务
23+
- [[#4734](https://github.com/seata/seata/pull/4734)] 修复AT模式下新增字段产生的字段找不到
2324
- [[#5316](https://github.com/seata/seata/pull/5316)] 修复jdk8 中 G1 参数
2425
- [[#5321](https://github.com/seata/seata/pull/5321)] 修复当TC端回滚返回RollbackFailed时,自定义FailureHandler的方法未执行
2526
- [[#5332](https://github.com/seata/seata/pull/5332)] 修复单元测试中发现的bug

rm-datasource/src/main/java/io/seata/rm/datasource/DataSourceProxy.java

Lines changed: 12 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,13 @@
1919
import java.sql.PreparedStatement;
2020
import java.sql.ResultSet;
2121
import java.sql.SQLException;
22-
import java.util.concurrent.ScheduledExecutorService;
23-
import java.util.concurrent.ScheduledThreadPoolExecutor;
24-
import java.util.concurrent.TimeUnit;
22+
2523
import javax.sql.DataSource;
2624

25+
import org.slf4j.Logger;
26+
import org.slf4j.LoggerFactory;
27+
2728
import io.seata.common.Constants;
28-
import io.seata.common.thread.NamedThreadFactory;
29-
import io.seata.config.ConfigurationFactory;
30-
import io.seata.common.ConfigurationKeys;
3129
import io.seata.core.constants.DBType;
3230
import io.seata.core.context.RootContext;
3331
import io.seata.core.model.BranchType;
@@ -36,11 +34,6 @@
3634
import io.seata.rm.datasource.sql.struct.TableMetaCacheFactory;
3735
import io.seata.rm.datasource.util.JdbcUtils;
3836
import io.seata.sqlparser.util.JdbcConstants;
39-
import org.slf4j.Logger;
40-
import org.slf4j.LoggerFactory;
41-
42-
import static io.seata.common.DefaultValues.DEFAULT_CLIENT_TABLE_META_CHECK_ENABLE;
43-
import static io.seata.common.DefaultValues.DEFAULT_TABLE_META_CHECKER_INTERVAL;
4437

4538
/**
4639
* The type Data source proxy.
@@ -65,21 +58,6 @@ public class DataSourceProxy extends AbstractDataSourceProxy implements Resource
6558

6659
private String version;
6760

68-
/**
69-
* Enable the table meta checker
70-
*/
71-
private static boolean ENABLE_TABLE_META_CHECKER_ENABLE = ConfigurationFactory.getInstance().getBoolean(
72-
ConfigurationKeys.CLIENT_TABLE_META_CHECK_ENABLE, DEFAULT_CLIENT_TABLE_META_CHECK_ENABLE);
73-
74-
/**
75-
* Table meta checker interval
76-
*/
77-
private static final long TABLE_META_CHECKER_INTERVAL = ConfigurationFactory.getInstance().getLong(
78-
ConfigurationKeys.CLIENT_TABLE_META_CHECKER_INTERVAL, DEFAULT_TABLE_META_CHECKER_INTERVAL);
79-
80-
private final ScheduledExecutorService tableMetaExecutor = new ScheduledThreadPoolExecutor(1,
81-
new NamedThreadFactory("tableMetaChecker", 1, true));
82-
8361
/**
8462
* Instantiates a new Data source proxy.
8563
*
@@ -120,20 +98,18 @@ private void init(DataSource dataSource, String resourceGroupId) {
12098
}
12199
initResourceId();
122100
DefaultResourceManager.get().registerResource(this);
123-
if (ENABLE_TABLE_META_CHECKER_ENABLE) {
124-
tableMetaExecutor.scheduleAtFixedRate(() -> {
125-
try (Connection connection = dataSource.getConnection()) {
126-
TableMetaCacheFactory.getTableMetaCache(DataSourceProxy.this.getDbType())
127-
.refresh(connection, DataSourceProxy.this.getResourceId());
128-
} catch (Exception ignore) {
129-
}
130-
}, 0, TABLE_META_CHECKER_INTERVAL, TimeUnit.MILLISECONDS);
131-
}
132-
101+
TableMetaCacheFactory.registerTableMeta(this);
133102
//Set the default branch type to 'AT' in the RootContext.
134103
RootContext.setDefaultBranchType(this.getBranchType());
135104
}
136105

106+
/**
107+
* publish tableMeta refresh event
108+
*/
109+
public void tableMetaRefreshEvent() {
110+
TableMetaCacheFactory.tableMetaRefreshEvent(this.getResourceId());
111+
}
112+
137113
/**
138114
* Gets plain connection.
139115
*
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright 1999-2019 Seata.io Group.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
5+
* the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
10+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
11+
* specific language governing permissions and limitations under the License.
12+
*/
13+
package io.seata.rm.datasource.exception;
14+
15+
import java.sql.SQLException;
16+
17+
/**
18+
* The type TableMetaException exception.
19+
*
20+
* @author Bughue
21+
*/
22+
public class TableMetaException extends SQLException {
23+
private String columnName;
24+
private String tableName;
25+
26+
public TableMetaException(String tableName, String columnName) {
27+
this.columnName = columnName;
28+
this.tableName = tableName;
29+
}
30+
31+
public String getTableName() {
32+
return tableName;
33+
}
34+
35+
public String getColumnName() {
36+
return columnName;
37+
}
38+
}

rm-datasource/src/main/java/io/seata/rm/datasource/exec/AbstractDMLBaseExecutor.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import io.seata.rm.datasource.ConnectionContext;
3131
import io.seata.rm.datasource.ConnectionProxy;
3232
import io.seata.rm.datasource.StatementProxy;
33+
import io.seata.rm.datasource.exception.TableMetaException;
3334
import io.seata.rm.datasource.sql.struct.TableRecords;
3435
import io.seata.sqlparser.SQLRecognizer;
3536
import org.slf4j.Logger;
@@ -93,11 +94,18 @@ public T doExecute(Object... args) throws Throwable {
9394
* @throws Exception the exception
9495
*/
9596
protected T executeAutoCommitFalse(Object[] args) throws Exception {
96-
TableRecords beforeImage = beforeImage();
97-
T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
98-
TableRecords afterImage = afterImage(beforeImage);
99-
prepareUndoLog(beforeImage, afterImage);
100-
return result;
97+
try {
98+
TableRecords beforeImage = beforeImage();
99+
T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
100+
TableRecords afterImage = afterImage(beforeImage);
101+
prepareUndoLog(beforeImage, afterImage);
102+
return result;
103+
} catch (TableMetaException e) {
104+
LOGGER.error("table meta will be refreshed later, due to TableMetaException, table:{}, column:{}",
105+
e.getTableName(), e.getColumnName());
106+
statementProxy.getConnectionProxy().getDataSourceProxy().tableMetaRefreshEvent();
107+
throw e;
108+
}
101109
}
102110

103111
private boolean isMultiPk() {

rm-datasource/src/main/java/io/seata/rm/datasource/exec/mysql/MySQLUpdateJoinExecutor.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,7 @@ protected TableMeta getTableMeta(String tableName) {
257257
* @param afterImage the after image
258258
* @return sql undo log
259259
*/
260+
@Override
260261
protected SQLUndoLog buildUndoItem(TableRecords beforeImage, TableRecords afterImage) {
261262
SQLType sqlType = sqlRecognizer.getSQLType();
262263
String tableName = beforeImage.getTableName();

rm-datasource/src/main/java/io/seata/rm/datasource/sql/struct/TableMetaCacheFactory.java

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,57 @@
1515
*/
1616
package io.seata.rm.datasource.sql.struct;
1717

18+
import java.sql.Connection;
1819
import java.util.Map;
20+
import java.util.concurrent.BlockingQueue;
1921
import java.util.concurrent.ConcurrentHashMap;
22+
import java.util.concurrent.Executor;
23+
import java.util.concurrent.LinkedBlockingQueue;
24+
import java.util.concurrent.ThreadPoolExecutor;
25+
import java.util.concurrent.TimeUnit;
2026

27+
import io.seata.common.ConfigurationKeys;
2128
import io.seata.common.loader.EnhancedServiceLoader;
29+
import io.seata.common.thread.NamedThreadFactory;
2230
import io.seata.common.util.CollectionUtils;
31+
import io.seata.config.ConfigurationFactory;
32+
import io.seata.rm.datasource.DataSourceProxy;
33+
import org.slf4j.Logger;
34+
import org.slf4j.LoggerFactory;
35+
36+
import static io.seata.common.DefaultValues.DEFAULT_CLIENT_TABLE_META_CHECK_ENABLE;
37+
import static io.seata.common.DefaultValues.DEFAULT_TABLE_META_CHECKER_INTERVAL;
2338

2439
/**
40+
* Table meta cache factory
41+
*
2542
* @author guoyao
2643
*/
2744
public class TableMetaCacheFactory {
2845

46+
private static final Logger LOGGER = LoggerFactory.getLogger(TableMetaCacheFactory.class);
47+
2948
private static final Map<String, TableMetaCache> TABLE_META_CACHE_MAP = new ConcurrentHashMap<>();
3049

50+
private static final Map<String, TableMetaRefreshHolder> TABLE_META_REFRESH_HOLDER_MAP = new ConcurrentHashMap<>();
51+
52+
private static final long TABLE_META_REFRESH_INTERVAL_TIME = 1000L;
53+
54+
private static final int MAX_QUEUE_SIZE = 2000;
55+
56+
/**
57+
* Enable the table meta checker
58+
*/
59+
private static boolean ENABLE_TABLE_META_CHECKER_ENABLE = ConfigurationFactory.getInstance()
60+
.getBoolean(ConfigurationKeys.CLIENT_TABLE_META_CHECK_ENABLE, DEFAULT_CLIENT_TABLE_META_CHECK_ENABLE);
61+
62+
/**
63+
* Table meta checker interval
64+
*/
65+
private static final long TABLE_META_CHECKER_INTERVAL = ConfigurationFactory.getInstance()
66+
.getLong(ConfigurationKeys.CLIENT_TABLE_META_CHECKER_INTERVAL, DEFAULT_TABLE_META_CHECKER_INTERVAL);
67+
68+
3169
/**
3270
* get table meta cache
3371
*
@@ -38,4 +76,70 @@ public static TableMetaCache getTableMetaCache(String dbType) {
3876
return CollectionUtils.computeIfAbsent(TABLE_META_CACHE_MAP, dbType,
3977
key -> EnhancedServiceLoader.load(TableMetaCache.class, dbType));
4078
}
79+
80+
/**
81+
* register table meta
82+
*
83+
* @param dataSourceProxy
84+
*/
85+
public static void registerTableMeta(DataSourceProxy dataSourceProxy) {
86+
TableMetaRefreshHolder holder = new TableMetaRefreshHolder(dataSourceProxy);
87+
TABLE_META_REFRESH_HOLDER_MAP.put(dataSourceProxy.getResourceId(), holder);
88+
}
89+
90+
/**
91+
* public tableMeta refresh event
92+
*/
93+
public static void tableMetaRefreshEvent(String resourceId) {
94+
TableMetaRefreshHolder refreshHolder = TABLE_META_REFRESH_HOLDER_MAP.get(resourceId);
95+
boolean offer = refreshHolder.tableMetaRefreshQueue.offer(System.currentTimeMillis());
96+
if (!offer) {
97+
LOGGER.error("table refresh event offer error:{}", resourceId);
98+
}
99+
}
100+
101+
static class TableMetaRefreshHolder {
102+
private long lastRefreshFinishTime;
103+
private DataSourceProxy dataSource;
104+
private BlockingQueue<Long> tableMetaRefreshQueue;
105+
106+
107+
private final Executor tableMetaRefreshExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
108+
new LinkedBlockingQueue<>(), new NamedThreadFactory("tableMetaRefresh", 1, true));
109+
110+
TableMetaRefreshHolder(DataSourceProxy dataSource) {
111+
this.dataSource = dataSource;
112+
this.lastRefreshFinishTime = System.currentTimeMillis() - TABLE_META_REFRESH_INTERVAL_TIME;
113+
this.tableMetaRefreshQueue = new LinkedBlockingQueue<>(MAX_QUEUE_SIZE);
114+
115+
tableMetaRefreshExecutor.execute(() -> {
116+
while (true) {
117+
// 1. check table meta
118+
if (ENABLE_TABLE_META_CHECKER_ENABLE
119+
&& System.currentTimeMillis() - lastRefreshFinishTime > TABLE_META_CHECKER_INTERVAL) {
120+
tableMetaRefreshEvent(dataSource.getResourceId());
121+
}
122+
123+
// 2. refresh table meta
124+
try {
125+
Long eventTime = tableMetaRefreshQueue.take();
126+
// if it has bean refreshed not long ago, skip
127+
if (eventTime - lastRefreshFinishTime > TABLE_META_REFRESH_INTERVAL_TIME) {
128+
try (Connection connection = dataSource.getConnection()) {
129+
TableMetaCache tableMetaCache =
130+
TableMetaCacheFactory.getTableMetaCache(dataSource.getDbType());
131+
tableMetaCache.refresh(connection, dataSource.getResourceId());
132+
}
133+
lastRefreshFinishTime = System.currentTimeMillis();
134+
}
135+
} catch (Exception exx) {
136+
LOGGER.error("table refresh error:{}", exx.getMessage(), exx);
137+
}
138+
}
139+
});
140+
}
141+
142+
143+
144+
}
41145
}

rm-datasource/src/main/java/io/seata/rm/datasource/sql/struct/TableRecords.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import javax.sql.rowset.serial.SerialJavaObject;
3535
import javax.sql.rowset.serial.SerialRef;
3636
import io.seata.common.exception.ShouldNeverHappenException;
37+
import io.seata.rm.datasource.exception.TableMetaException;
3738
import io.seata.rm.datasource.sql.serial.SerialArray;
3839
import static io.seata.rm.datasource.exec.oracle.OracleJdbcType.TIMESTAMP_WITH_LOCAL_TIME_ZONE;
3940
import static io.seata.rm.datasource.exec.oracle.OracleJdbcType.TIMESTAMP_WITH_TIME_ZONE;
@@ -196,7 +197,7 @@ public static TableRecords buildRecords(TableMeta tmeta, ResultSet resultSet) th
196197
List<Field> fields = new ArrayList<>(columnCount);
197198
for (int i = 1; i <= columnCount; i++) {
198199
String colName = resultSetMetaData.getColumnName(i);
199-
ColumnMeta col = tmeta.getColumnMeta(colName);
200+
ColumnMeta col = getColumnMeta(tmeta,colName);
200201
int dataType = col.getDataType();
201202
Field field = new Field();
202203
field.setName(col.getColumnName());
@@ -259,6 +260,20 @@ public static TableRecords buildRecords(TableMeta tmeta, ResultSet resultSet) th
259260
return records;
260261
}
261262

263+
/**
264+
* check if the column is null and return
265+
*
266+
* @param tmeta the table meta
267+
* @param colName the column nmae
268+
*/
269+
private static ColumnMeta getColumnMeta(TableMeta tmeta , String colName) throws SQLException {
270+
ColumnMeta col = tmeta.getColumnMeta(colName);
271+
if (col == null) {
272+
throw new TableMetaException(tmeta.getTableName(), colName);
273+
}
274+
return col;
275+
}
276+
262277
/**
263278
* since there is no parameterless constructor for Blob, Clob and NClob just like mysql,
264279
* it needs to be converted to Serial_ type

rm-datasource/src/test/java/io/seata/rm/datasource/DataSourceProxyTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.alibaba.druid.pool.DruidDataSource;
2323
import io.seata.rm.datasource.mock.MockDataSource;
2424
import io.seata.rm.datasource.mock.MockDriver;
25+
import io.seata.rm.datasource.sql.struct.TableMetaCacheFactory;
2526
import org.junit.jupiter.api.Assertions;
2627
import org.junit.jupiter.api.Test;
2728

@@ -44,7 +45,7 @@ public void test_constructor() {
4445
@Test
4546
public void getResourceIdTest() throws SQLException, NoSuchFieldException, IllegalAccessException {
4647
// Disable 'DataSourceProxy.tableMetaExecutor' to prevent unit tests from being affected
47-
Field enableField = DataSourceProxy.class.getDeclaredField("ENABLE_TABLE_META_CHECKER_ENABLE");
48+
Field enableField = TableMetaCacheFactory.class.getDeclaredField("ENABLE_TABLE_META_CHECKER_ENABLE");
4849
enableField.setAccessible(true);
4950
enableField.set(null, false);
5051

0 commit comments

Comments
 (0)