Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,17 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

### :magic_wand: Added
- Added support of Global Databases including and Global Database endpoint. ([PR #1573](https://github.com/aws/aws-advanced-jdbc-wrapper/pull/1573)).

## [2.6.8] - 2025-12-04

### :bug: Fixed
- Wait timeout for custom endpoint info ([PR #1616](https://github.com/aws/aws-advanced-jdbc-wrapper/pull/1616))
- Use custom endpoint as cluster ID to prevent accumulating of connections if using a custom endpoint in a connection string ([PR #1619](https://github.com/aws/aws-advanced-jdbc-wrapper/pull/1619))

### :crab: Changed
- Improve exception handling when DB server reaches maximum available connections ([PR #1621](https://github.com/aws/aws-advanced-jdbc-wrapper/pull/1621))
- Improve connection management to prevent connection leaking in topology monitor ([PR #1624](https://github.com/aws/aws-advanced-jdbc-wrapper/pull/1624))

## [2.6.7] - 2025-11-25

### :bug: Fixed
Expand Down Expand Up @@ -582,6 +593,9 @@ The Amazon Web Services (AWS) Advanced JDBC Driver allows an application to take
- The [AWS IAM Authentication Connection Plugin](./docs/using-the-jdbc-driver/using-plugins/UsingTheIamAuthenticationPlugin.md)
- The [AWS Secrets Manager Connection Plugin](./docs/using-the-jdbc-driver/using-plugins/UsingTheAwsSecretsManagerPlugin.md)

[2.6.8]: https://github.com/aws/aws-advanced-jdbc-wrapper/compare/2.6.7...2.6.8
[2.6.7]: https://github.com/aws/aws-advanced-jdbc-wrapper/compare/2.6.6...2.6.7
[2.6.6]: https://github.com/aws/aws-advanced-jdbc-wrapper/compare/2.6.5...2.6.6
[2.6.5]: https://github.com/aws/aws-advanced-jdbc-wrapper/compare/2.6.4...2.6.5
[2.6.4]: https://github.com/aws/aws-advanced-jdbc-wrapper/compare/2.6.3...2.6.4
[2.6.3]: https://github.com/aws/aws-advanced-jdbc-wrapper/compare/2.6.2...2.6.3
Expand Down
3 changes: 2 additions & 1 deletion Maintenance.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
| October 16, 2025 | [Release 2.6.5](https://github.com/aws/aws-advanced-jdbc-wrapper/releases/tag/2.6.5) |
| November 5, 2025 | [Release 2.6.6](https://github.com/aws/aws-advanced-jdbc-wrapper/releases/tag/2.6.6) |
| November 25, 2025 | [Release 2.6.7](https://github.com/aws/aws-advanced-jdbc-wrapper/releases/tag/2.6.7) |
| December 4, 2025 | [Release 2.6.8](https://github.com/aws/aws-advanced-jdbc-wrapper/releases/tag/2.6.8) |

`aws-advanced-jdbc-wrapper` [follows semver](https://semver.org/#semantic-versioning-200) which means we will only
release breaking changes in major versions. Generally speaking patches will be released to fix existing problems without
Expand Down Expand Up @@ -93,5 +94,5 @@ from the updated source after the PRs are merged.
| Major Version | Latest Minor Version | Status | Initial Release | Maintenance Window Start | Maintenance Window End |
|---------------|----------------------|-------------|-----------------|--------------------------|------------------------|
| 1 | 1.0.2 | Maintenance | Oct 5, 2022 | Apr 28, 2023 | Apr 28, 2024 |
| 2 | 2.6.7 | Maintenance | Jan 1, 2026 | Dec 31, 2026 | N/A |
| 2 | 2.6.8 | Maintenance | Apr 28, 2023 | Dec 31, 2026 | N/A |
| 3 | 3.0.0 | Current | Dec 12, 2025 | N/A | N/A |
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ public boolean isNetworkException(final String sqlState) {
return false;
}

return sqlState.startsWith("08");
// 08004 - "Server rejected the connection" or "Connection refused by server"
return sqlState.startsWith("08") && !"08004".equals(sqlState);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ public class PgExceptionHandler extends AbstractPgExceptionHandler {

// The following SQL States for Postgresql are considered as "communication" errors
private static final List<String> NETWORK_ERRORS = Arrays.asList(
"53", // insufficient resources
"57P01", // admin shutdown
"57P02", // crash shutdown
"57P03", // cannot connect now
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ public GlobalAuroraTopologyUtils(GlobalAuroraTopologyDialect dialect, HostSpecBu
public @Nullable List<HostSpec> queryForTopology(
Connection conn, HostSpec initialHostSpec, Map<String, HostSpec> instanceTemplatesByRegion)
throws SQLException {
int originalNetworkTimeout = setNetworkTimeout(conn);
final Pair<Integer, Boolean> networkTimeoutPair = this.setNetworkTimeout(conn);
int originalNetworkTimeout = networkTimeoutPair.getValue1();
boolean timeoutChanged = networkTimeoutPair.getValue2();
try (final Statement stmt = conn.createStatement();
final ResultSet rs = stmt.executeQuery(this.dialect.getTopologyQuery())) {
if (rs.getMetaData().getColumnCount() == 0) {
Expand All @@ -68,7 +70,7 @@ public GlobalAuroraTopologyUtils(GlobalAuroraTopologyDialect dialect, HostSpecBu
} catch (final SQLSyntaxErrorException e) {
throw new SQLException(Messages.get("TopologyUtils.invalidQuery"), e);
} finally {
if (originalNetworkTimeout == 0 && !conn.isClosed()) {
if (timeoutChanged && !conn.isClosed()) {
conn.setNetworkTimeout(networkTimeoutExecutor, originalNetworkTimeout);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ public TopologyUtils(
*/
public @Nullable List<HostSpec> queryForTopology(Connection conn, HostSpec initialHostSpec, HostSpec instanceTemplate)
throws SQLException {
int originalNetworkTimeout = setNetworkTimeout(conn);
final Pair<Integer, Boolean> networkTimeoutPair = this.setNetworkTimeout(conn);
int originalNetworkTimeout = networkTimeoutPair.getValue1();
boolean timeoutChanged = networkTimeoutPair.getValue2();
try (final Statement stmt = conn.createStatement();
final ResultSet rs = stmt.executeQuery(this.dialect.getTopologyQuery())) {
if (rs.getMetaData().getColumnCount() == 0) {
Expand All @@ -84,24 +86,26 @@ public TopologyUtils(
} catch (final SQLSyntaxErrorException e) {
throw new SQLException(Messages.get("TopologyUtils.invalidQuery"), e);
} finally {
if (originalNetworkTimeout == 0 && !conn.isClosed()) {
if (timeoutChanged && !conn.isClosed()) {
conn.setNetworkTimeout(networkTimeoutExecutor, originalNetworkTimeout);
}
}
}

protected int setNetworkTimeout(Connection conn) {
protected Pair<Integer, Boolean> setNetworkTimeout(Connection conn) {
int networkTimeout = -1;
boolean timeoutChanged = false;
try {
networkTimeout = conn.getNetworkTimeout();
// The topology query is not monitored by the EFM plugin, so it needs a socket timeout.
if (networkTimeout == 0) {
conn.setNetworkTimeout(this.networkTimeoutExecutor, DEFAULT_QUERY_TIMEOUT_MS);
timeoutChanged = true;
}
} catch (SQLException e) {
LOGGER.warning(() -> Messages.get("TopologyUtils.errorGettingNetworkTimeout", new Object[] {e.getMessage()}));
}
return networkTimeout;
return Pair.create(networkTimeout, timeoutChanged);
}

protected abstract @Nullable List<HostSpec> getHosts(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,9 @@
import java.sql.SQLSyntaxErrorException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -73,6 +72,10 @@ public class ClusterTopologyMonitorImpl extends AbstractMonitor implements Clust
protected static final long highRefreshPeriodAfterPanicNano = TimeUnit.SECONDS.toNanos(30);
protected static final long ignoreTopologyRequestNano = TimeUnit.SECONDS.toNanos(10);

private static final long INITIAL_BACKOFF_MS = 100;
private static final long MAX_BACKOFF_MS = 10000;
private static final Random random = new Random();

protected final AtomicReference<HostSpec> writerHostSpec = new AtomicReference<>(null);
protected final AtomicReference<Connection> monitoringConnection = new AtomicReference<>(null);

Expand Down Expand Up @@ -237,7 +240,11 @@ private List<HostSpec> getStoredHosts() {
@Override
public void stop() {
this.nodeThreadsStop.set(true);
this.shutdownNodeExecutorService();

this.closeNodeMonitors();
this.closeConnection(this.nodeThreadsWriterConnection);
this.closeConnection(this.nodeThreadsReaderConnection);
this.closeConnection(this.monitoringConnection);

// This code interrupts the waiting/sleeping cycle in the monitoring thread.
synchronized (this.requestToUpdateTopology) {
Expand All @@ -250,6 +257,7 @@ public void stop() {

@Override
public void close() {
this.closeNodeMonitors();
this.closeConnection(this.monitoringConnection);
this.closeConnection(this.nodeThreadsWriterConnection);
this.closeConnection(this.nodeThreadsReaderConnection);
Expand Down Expand Up @@ -277,8 +285,7 @@ public void monitor() throws Exception {

// Start node monitors.
this.nodeThreadsStop.set(false);
this.nodeThreadsWriterConnection.set(null);
this.nodeThreadsReaderConnection.set(null);
this.nodeThreadConnectionCleanUp();
this.nodeThreadsWriterHostSpec.set(null);
this.nodeThreadsLatestTopology.set(null);

Expand All @@ -288,7 +295,7 @@ public void monitor() throws Exception {
hosts = this.openAnyConnectionAndUpdateTopology();
}

this.shutdownNodeExecutorService();
this.closeNodeMonitors();
this.createNodeExecutorService();

if (hosts != null && !this.isVerifiedWriterConnection) {
Expand Down Expand Up @@ -341,7 +348,7 @@ public void monitor() throws Exception {
}

this.nodeThreadsStop.set(true);
this.shutdownNodeExecutorService();
this.closeNodeMonitors();
this.submittedNodes.clear();

continue;
Expand Down Expand Up @@ -381,7 +388,7 @@ public void monitor() throws Exception {
} else {
// We are in regular mode (not panic mode).
if (!this.submittedNodes.isEmpty()) {
this.shutdownNodeExecutorService();
this.closeNodeMonitors();
this.submittedNodes.clear();
}

Expand All @@ -390,6 +397,7 @@ public void monitor() throws Exception {
// Attempt to fetch topology failed, so we switch to panic mode.
this.closeConnection(this.monitoringConnection);
this.isVerifiedWriterConnection = false;
this.writerHostSpec.set(null);
continue;
}

Expand Down Expand Up @@ -428,11 +436,9 @@ public void monitor() throws Exception {
throw ex;
} finally {
this.stop.set(true);
this.shutdownNodeExecutorService();

final Connection conn = this.monitoringConnection.get();
this.monitoringConnection.set(null);
this.closeConnection(conn);
this.closeNodeMonitors();
this.closeConnection(this.monitoringConnection);
this.nodeThreadConnectionCleanUp();

this.servicesContainer.getEventPublisher().unsubscribe(
this, Collections.singleton(MonitorResetEvent.class));
Expand All @@ -448,14 +454,13 @@ protected void reset() {
new Object[]{this.clusterId, this.initialHostSpec.getHost()}));

this.nodeThreadsStop.set(true);
this.shutdownNodeExecutorService();
this.closeNodeMonitors();
this.nodeThreadConnectionCleanUp();
this.nodeThreadsStop.set(false);
this.submittedNodes.clear();
this.createNodeExecutorService();

this.nodeThreadsWriterConnection.set(null);
this.nodeThreadsWriterHostSpec.set(null);
this.nodeThreadsReaderConnection.set(null);
this.nodeThreadsLatestTopology.set(null);

this.closeConnection(this.monitoringConnection);
Expand Down Expand Up @@ -484,7 +489,20 @@ public void processEvent(Event event) {
}
}

protected void shutdownNodeExecutorService() {
protected void nodeThreadConnectionCleanUp() {
if (this.monitoringConnection.get() != this.nodeThreadsWriterConnection.get()) {
this.closeConnection(this.nodeThreadsWriterConnection);
} else {
this.nodeThreadsWriterConnection.set(null);
}
if (this.monitoringConnection.get() != this.nodeThreadsReaderConnection.get()) {
this.closeConnection(this.nodeThreadsReaderConnection);
} else {
this.nodeThreadsReaderConnection.set(null);
}
}

protected void closeNodeMonitors() {
if (this.nodeExecutorService != null) {

this.nodeExecutorLock.lock();
Expand All @@ -507,6 +525,8 @@ protected void shutdownNodeExecutorService() {
}

this.nodeExecutorService = null;
this.nodeThreadConnectionCleanUp();

} finally {
this.nodeExecutorLock.unlock();
}
Expand Down Expand Up @@ -600,6 +620,7 @@ protected List<HostSpec> openAnyConnectionAndUpdateTopology() {
// Attempt to fetch topology failed. There might be something wrong with the connection, so we close it here.
this.closeConnection(this.monitoringConnection);
this.isVerifiedWriterConnection = false;
this.writerHostSpec.set(null);
}

return hosts;
Expand Down Expand Up @@ -702,6 +723,7 @@ private static class NodeMonitoringWorker implements Runnable {
protected final HostSpec hostSpec;
protected final @Nullable HostSpec writerHostSpec;
protected boolean writerChanged = false;
protected int connectionAttempts = 0;

public NodeMonitoringWorker(
final FullServicesContainer servicesContainer,
Expand Down Expand Up @@ -729,10 +751,25 @@ public void run() {
try {
connection = this.servicesContainer.getPluginService().forceConnect(
hostSpec, this.monitor.monitoringProperties);
this.connectionAttempts = 0;
} catch (SQLException ex) {
// A problem occurred while connecting. We will try again on the next iteration.
TimeUnit.MILLISECONDS.sleep(100);
continue;
// A problem occurred while connecting.
if (this.servicesContainer.getPluginService().isNetworkException(
ex, this.servicesContainer.getPluginService().getTargetDriverDialect())) {
// It's a network issue that's expected during a cluster failover.
// We will try again on the next iteration.
TimeUnit.MILLISECONDS.sleep(100);
continue;
} else if (this.servicesContainer.getPluginService().isLoginException(
ex, this.servicesContainer.getPluginService().getTargetDriverDialect())) {
// Something wrong with login credentials. We can't continue.
throw new RuntimeException(ex);
} else {
// It might be some transient error. Let's try again.
// If the error repeats, we will try again after a longer delay.
TimeUnit.MILLISECONDS.sleep(this.calculateBackoffWithJitter(this.connectionAttempts++));
continue;
}
}
}

Expand Down Expand Up @@ -790,12 +827,10 @@ public void run() {
// be established.
if (updateTopology) {
this.readerThreadFetchTopology(connection, this.writerHostSpec);
} else if (this.monitor.nodeThreadsReaderConnection.get() == null) {
if (this.monitor.nodeThreadsReaderConnection.compareAndSet(null, connection)) {
// Use this connection to update the topology.
updateTopology = true;
this.readerThreadFetchTopology(connection, this.writerHostSpec);
}
} else if (this.monitor.nodeThreadsReaderConnection.compareAndSet(null, connection)) {
// Use this connection to update the topology.
updateTopology = true;
this.readerThreadFetchTopology(connection, this.writerHostSpec);
}
}
}
Expand Down Expand Up @@ -858,5 +893,11 @@ private void readerThreadFetchTopology(final Connection connection, final @Nulla
LOGGER.fine(() -> LogUtils.logTopology(hosts));
}
}

private long calculateBackoffWithJitter(int attempt) {
long backoff = INITIAL_BACKOFF_MS * Math.round(Math.pow(2, Math.min(attempt, 6)));
backoff = Math.min(backoff, MAX_BACKOFF_MS);
return Math.round(backoff * (0.5 + random.nextDouble() * 0.5));
}
}
}
Loading
Loading