Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions gateway-ha/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@
</exclusions>
</dependency>

<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
<version>7.0.2</version>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>aircompressor-v3</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,32 @@ public class DataStoreConfiguration
private String driver;
private Integer queryHistoryHoursRetention = 4;
private boolean runMigrationsEnabled = true;
private Integer maxPoolSize;

public DataStoreConfiguration(String jdbcUrl, String user, String password, String driver, Integer queryHistoryHoursRetention, boolean runMigrationsEnabled)
public DataStoreConfiguration(String jdbcUrl, String user, String password, String driver, Integer queryHistoryHoursRetention, boolean runMigrationsEnabled, Integer maxPoolSize)
{
this.jdbcUrl = jdbcUrl;
this.user = user;
this.password = password;
this.driver = driver;
this.queryHistoryHoursRetention = queryHistoryHoursRetention;
this.runMigrationsEnabled = runMigrationsEnabled;
this.maxPoolSize = maxPoolSize;
}

public DataStoreConfiguration(String jdbcUrl, String user, String password, String driver, Integer queryHistoryHoursRetention, boolean runMigrationsEnabled)
{
this(jdbcUrl, user, password, driver, queryHistoryHoursRetention, runMigrationsEnabled, null);
}

public Integer getMaxPoolSize()
{
return this.maxPoolSize;
}

public void setMaxPoolSize(Integer maxPoolSize)
{
this.maxPoolSize = maxPoolSize;
}

public DataStoreConfiguration() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package io.trino.gateway.ha.persistence;

import com.google.common.annotations.VisibleForTesting;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import io.airlift.log.Logger;
import io.trino.gateway.ha.config.DataStoreConfiguration;
import io.trino.gateway.ha.persistence.dao.QueryHistoryDao;
Expand All @@ -24,6 +26,8 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Path;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand All @@ -39,6 +43,8 @@ public class JdbcConnectionManager
private final ScheduledExecutorService executorService =
Executors.newSingleThreadScheduledExecutor();

private final Map<String, HikariDataSource> pools = new ConcurrentHashMap<>();

public JdbcConnectionManager(Jdbi jdbi, DataStoreConfiguration configuration)
{
this.jdbi = requireNonNull(jdbi, "jdbi is null")
Expand All @@ -59,7 +65,18 @@ public Jdbi getJdbi(@Nullable String routingGroupDatabase)
return jdbi;
}

return Jdbi.create(buildJdbcUrl(routingGroupDatabase), configuration.getUser(), configuration.getPassword())
Integer maxPoolSize = configuration.getMaxPoolSize();
if (maxPoolSize != null && maxPoolSize > 0) {
HikariDataSource ds = getOrCreateDataSource(routingGroupDatabase, maxPoolSize);
return Jdbi.create(ds)
.installPlugin(new SqlObjectPlugin())
.registerRowMapper(new RecordAndAnnotatedConstructorMapper());
}

return Jdbi.create(
buildJdbcUrl(routingGroupDatabase),
configuration.getUser(),
configuration.getPassword())
.installPlugin(new SqlObjectPlugin())
.registerRowMapper(new RecordAndAnnotatedConstructorMapper());
}
Expand Down Expand Up @@ -107,11 +124,51 @@ private void startCleanUps()
executorService.scheduleWithFixedDelay(
() -> {
log.info("Performing query history cleanup task");
long created = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(this.configuration.getQueryHistoryHoursRetention());
long created = System.currentTimeMillis()
- TimeUnit.HOURS.toMillis(this.configuration.getQueryHistoryHoursRetention());
jdbi.onDemand(QueryHistoryDao.class).deleteOldHistory(created);
},
1,
120,
TimeUnit.MINUTES);
}

private HikariDataSource getOrCreateDataSource(String routingGroupDatabase, int maxPoolSize)
{
return pools.compute(routingGroupDatabase, (key, existing) -> {
if (existing != null && !existing.isClosed()) {
return existing;
}

HikariConfig cfg = new HikariConfig();
cfg.setJdbcUrl(buildJdbcUrl(key));
cfg.setUsername(configuration.getUser());
cfg.setPassword(configuration.getPassword());
if (configuration.getDriver() != null) {
cfg.setDriverClassName(configuration.getDriver());
}
cfg.setMaximumPoolSize(maxPoolSize);
cfg.setPoolName("gateway-ha-" + key);

return new HikariDataSource(cfg);
});
}

public void close()
{
for (Map.Entry<String, HikariDataSource> e : pools.entrySet()) {
HikariDataSource ds = e.getValue();
if (ds != null && !ds.isClosed()) {
try {
ds.close();
}
catch (RuntimeException ex) {
log.warn(ex, "Failed to close datasource for key: %s", e.getKey());
}
}
}
pools.clear();

executorService.shutdownNow();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.gateway.ha.persistence;

import io.trino.gateway.ha.config.DataStoreConfiguration;
import org.jdbi.v3.core.Jdbi;
import org.junit.jupiter.api.Test;

import java.nio.file.Path;
import java.sql.Connection;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static org.assertj.core.api.Assertions.assertThat;

final class TestJdbcConnectionManagerPool
{
@Test
void blocksWhenExceedingMaxPoolSize()
throws Exception
{
String dbPath = Path.of(System.getProperty("java.io.tmpdir"), "h2db-pool-" + System.currentTimeMillis()).toString();
String jdbcUrl = "jdbc:h2:" + dbPath;

DataStoreConfiguration cfg = new DataStoreConfiguration(
jdbcUrl, "sa", "sa", "org.h2.Driver",
4, true,
2);

JdbcConnectionManager cm = new JdbcConnectionManager(Jdbi.create(jdbcUrl, "sa", "sa"), cfg);
Jdbi jdbi = cm.getJdbi("testdb");

try (ExecutorService es = Executors.newFixedThreadPool(3)) {
List<Future<Connection>> acquired = new ArrayList<>();

CountDownLatch hold = new CountDownLatch(1);
CountDownLatch acquiredLatch = new CountDownLatch(2);

// Open exactly maxPoolSize connections and keep them open
for (int i = 0; i < 2; i++) {
acquired.add(es.submit(() -> {
try (var h = jdbi.open()) {
acquiredLatch.countDown();
boolean released = hold.await(10, TimeUnit.SECONDS);
assertThat(released).as("hold latch should be released by the test").isTrue();
}
return null;
}));
}

// Wait until both connections are actually acquired (avoid race)
boolean bothAcquired = acquiredLatch.await(3, TimeUnit.SECONDS);
assertThat(bothAcquired).as("both connections should be acquired before third attempt").isTrue();

// Third attempt should block since the pool is full
Future<Boolean> third = es.submit(() -> {
var h = jdbi.open();
h.close();
return true;
});

boolean completedIn200ms = false;
try {
third.get(200, TimeUnit.MILLISECONDS);
completedIn200ms = true; // if this happens when connection was not blocked, which is wrong
}
catch (TimeoutException expected) {
// expected, means the request was blocked on the pool
}

assertThat(completedIn200ms)
.as("third getJdbi().open() should be blocked by maxPoolSize=2")
.isFalse();

// Release the first two connections, the third one should complete now
hold.countDown();
assertThat(third.get(3, TimeUnit.SECONDS)).isTrue();

// Wait for the first two to finish gracefully
for (Future<Connection> f : acquired) {
f.get(3, TimeUnit.SECONDS);
}
}
}

@Test
void doesNotBlockWhenMaxPoolSizeIsNull()
throws Exception
{
String dbPath = Path.of(System.getProperty("java.io.tmpdir"), "h2db-nopool-" + System.currentTimeMillis()).toString();
String jdbcUrl = "jdbc:h2:" + dbPath;

// maxPoolSize == null -> no pool path
DataStoreConfiguration cfg = new DataStoreConfiguration(
jdbcUrl, "sa", "sa", "org.h2.Driver",
4, true);

JdbcConnectionManager cm = new JdbcConnectionManager(Jdbi.create(jdbcUrl, "sa", "sa"), cfg);
Jdbi jdbi = cm.getJdbi("testdb");

try (ExecutorService es = Executors.newFixedThreadPool(3)) {
try {
CountDownLatch hold = new CountDownLatch(1);
CountDownLatch acquiredLatch = new CountDownLatch(2);

// Open two connections and keep them open
for (int i = 0; i < 2; i++) {
es.submit(() -> {
try (var h = jdbi.open()) {
acquiredLatch.countDown();
boolean released = hold.await(10, TimeUnit.SECONDS);
assertThat(released).isTrue();
}
return null;
});
}

// Wait until both connections are really open (avoid race conditions)
boolean bothAcquired = acquiredLatch.await(3, TimeUnit.SECONDS);
assertThat(bothAcquired).isTrue();

// Third connection attempt should NOT block since no pool is used
Future<Boolean> third = es.submit(() -> {
var h = jdbi.open();
h.close();
return true;
});

boolean completedIn200ms;
try {
third.get(200, TimeUnit.MILLISECONDS);
completedIn200ms = true; // not blocked - expected behavior
}
catch (TimeoutException ignore) {
completedIn200ms = false; // blocked - incorrect for no-pool case
}

assertThat(completedIn200ms)
.as("third getJdbi().open() should NOT block when no pool is configured")
.isTrue();

// check H2 session count to confirm multiple physical connections were opened
int sessions = jdbi.withHandle(h ->
h.createQuery("SELECT COUNT(*) FROM INFORMATION_SCHEMA.SESSIONS")
.mapTo(int.class)
.one());
assertThat(sessions).isGreaterThanOrEqualTo(3);

// Release the first two connections
hold.countDown();
assertThat(third.get(3, TimeUnit.SECONDS)).isTrue();
}
finally {
es.shutdownNow();
}
}
}
}