Skip to content

Commit 3227e5f

Browse files
authored
fix(sql-execute): add an user config and add concurrent control for session creating (#2020)
1 parent d60a646 commit 3227e5f

7 files changed

Lines changed: 85 additions & 17 deletions

File tree

server/odc-core/src/main/java/com/oceanbase/odc/core/session/BaseConnectionSessionManager.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,11 @@ public ConnectionSession start(ConnectionSessionFactory factory) {
5656
try {
5757
doStoreSession(session);
5858
} catch (Throwable e) {
59+
try {
60+
session.expire();
61+
} catch (Exception e1) {
62+
log.warn("Failed to expire session, session storage failed, sessId={}", session.getId(), e1);
63+
}
5964
log.warn("Failed to store a session, session={}", session, e);
6065
try {
6166
onCreateFailed(session, e);

server/odc-service/src/main/java/com/oceanbase/odc/service/dispatch/RequestDispatcher.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -77,13 +77,11 @@ public DispatchResponse forward(@NonNull String ip, @NonNull Integer port) throw
7777
}
7878

7979
public DispatchResponse forward(@NonNull String ip, @NonNull Integer port, HttpServletRequest request,
80-
ByteArrayOutputStream requestBody)
81-
throws IOException {
80+
ByteArrayOutputStream requestBody) throws IOException {
8281
String hostUrl = getHostUrl(ip, port);
8382
return forward(hostUrl, request, requestBody);
8483
}
8584

86-
8785
private DispatchResponse forward(String hostUrl, HttpServletRequest request, ByteArrayOutputStream requestBody)
8886
throws IOException {
8987
Verify.notNull(request, "HttpServletRequest");
@@ -96,7 +94,6 @@ private DispatchResponse forward(String hostUrl, HttpServletRequest request, Byt
9694
return forward(hostUrl, method, requestUrl, headers, requestBody.toByteArray());
9795
}
9896

99-
10097
public String getHostUrl(@NonNull String ip, @NonNull Integer port) {
10198
return String.format("%s://%s:%s", PROTOCAL, ip, port);
10299
}
@@ -118,8 +115,6 @@ public String getRequestUrlByRequest(HttpServletRequest request) {
118115
return uriBuilder.toString();
119116
}
120117

121-
122-
123118
public DispatchResponse forward(@NonNull String hostUrl, @NonNull HttpMethod method,
124119
@NonNull String requestUri, @NonNull HttpHeaders headers, byte[] requestBody) throws IOException {
125120
verifyAndReduceTtl(headers);

server/odc-service/src/main/java/com/oceanbase/odc/service/session/ConnectConsoleService.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -447,7 +447,6 @@ private List<SqlTuple> generateSqlTuple(List<OffsetString> sqls, ConnectionSessi
447447
}
448448
if (Objects.isNull(request.getQueryLimit())
449449
|| !request.ifAddROWID()
450-
|| !sessionProperties.isAddInternalRowId()
451450
|| session.getDialectType() != DialectType.OB_ORACLE) {
452451
return target;
453452
}

server/odc-service/src/main/java/com/oceanbase/odc/service/session/ConnectSessionService.java

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,10 @@
2525
import java.util.Objects;
2626
import java.util.Optional;
2727
import java.util.Set;
28+
import java.util.concurrent.ConcurrentHashMap;
2829
import java.util.concurrent.TimeUnit;
30+
import java.util.concurrent.locks.Lock;
31+
import java.util.concurrent.locks.ReentrantLock;
2932

3033
import javax.annotation.PostConstruct;
3134
import javax.annotation.PreDestroy;
@@ -146,9 +149,9 @@ public class ConnectSessionService {
146149
private CloudMetadataClient cloudMetadataClient;
147150
@Autowired
148151
private UserConfigFacade userConfigFacade;
149-
150152
@Autowired
151153
private StateHostGenerator stateHostGenerator;
154+
private final Map<String, Lock> sessionId2Lock = new ConcurrentHashMap<>();
152155

153156
@PostConstruct
154157
public void init() {
@@ -158,6 +161,7 @@ public void init() {
158161
this.connectionSessionManager = new DefaultConnectionSessionManager(
159162
new DefaultTaskManager("connection-session-management"), repository);
160163
this.connectionSessionManager.addListener(new SessionLimitListener(limitService));
164+
this.connectionSessionManager.addListener(new SessionLockRemoveListener(this.sessionId2Lock));
161165
this.connectionSessionManager.enableAsyncRefreshSessionManager();
162166
this.connectionSessionManager.addSessionValidator(
163167
new SessionValidatorPredicate(sessionProperties.getTimeoutMins(), TimeUnit.MINUTES));
@@ -333,9 +337,25 @@ public ConnectionSession nullSafeGet(@NotNull String sessionId, boolean autoCrea
333337
if (!autoCreate || !StringUtils.equals(req.getFrom(), stateHostGenerator.getHost())) {
334338
throw new NotFoundException(ResourceType.ODC_SESSION, "ID", sessionId);
335339
}
336-
session = create(req);
337-
ConnectionSessionUtil.setConsoleSessionResetFlag(session, true);
338-
return session;
340+
Lock lock = this.sessionId2Lock.computeIfAbsent(sessionId, s -> new ReentrantLock());
341+
try {
342+
if (!lock.tryLock(10, TimeUnit.SECONDS)) {
343+
throw new IllegalStateException("Session is creating, please wait and retry later");
344+
}
345+
} catch (Exception e) {
346+
throw new IllegalStateException(e);
347+
}
348+
try {
349+
session = connectionSessionManager.getSession(sessionId);
350+
if (session != null) {
351+
return session;
352+
}
353+
session = create(req);
354+
ConnectionSessionUtil.setConsoleSessionResetFlag(session, true);
355+
return session;
356+
} finally {
357+
lock.unlock();
358+
}
339359
}
340360
if (!Objects.equals(ConnectionSessionUtil.getUserId(session), authenticationFacade.currentUserId())) {
341361
throw new NotFoundException(ResourceType.ODC_SESSION, "ID", sessionId);
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright (c) 2023 OceanBase.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.oceanbase.odc.service.session;
17+
18+
import java.util.Map;
19+
import java.util.concurrent.locks.Lock;
20+
21+
import com.oceanbase.odc.core.session.ConnectionSession;
22+
import com.oceanbase.odc.core.session.DefaultSessionEventListener;
23+
24+
/**
25+
* {@link SessionLockRemoveListener}
26+
*
27+
* @author yh263208
28+
* @date 2024-03-22 16:59
29+
* @since ODC_release_4.2.4
30+
* @see DefaultSessionEventListener
31+
*/
32+
public class SessionLockRemoveListener extends DefaultSessionEventListener {
33+
34+
private final Map<String, Lock> sessionId2Lock;
35+
36+
public SessionLockRemoveListener(Map<String, Lock> sessionId2Lock) {
37+
this.sessionId2Lock = sessionId2Lock;
38+
}
39+
40+
@Override
41+
public void onExpire(ConnectionSession session) {
42+
this.sessionId2Lock.remove(session.getId());
43+
}
44+
45+
}

server/odc-service/src/main/java/com/oceanbase/odc/service/session/SessionProperties.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -116,12 +116,6 @@ public class SessionProperties {
116116
@Value("${odc.session.timeout-mins:480}")
117117
private long timeoutMins = 480;
118118

119-
/**
120-
* 用于控制用户在 SQL 执行时是否改写增加 rowid 列
121-
*/
122-
@Value("${odc.session.sql-execute.add-internal-rowid:true}")
123-
private boolean addInternalRowId = true;
124-
125119
/**
126120
* Timeout for querying full link trace
127121
*/

server/odc-service/src/main/resources/config-meta/user-config-meta.yml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,3 +161,13 @@
161161
default_value: 'false'
162162
nullable: false
163163
description: ''
164+
- id: 17
165+
category: 'user'
166+
key: 'odc.sqlexecute.default.addInternalRowId'
167+
type: 'boolean'
168+
max_value: ~
169+
min_value: ~
170+
allowed_values: [ 'true','false' ]
171+
default_value: 'true'
172+
nullable: false
173+
description: 'Whether the query is being rewritten in ob-oracle mode and increases the RowId.'

0 commit comments

Comments
 (0)