|
46 | 46 | import java.util.Set; |
47 | 47 | import java.util.TreeSet; |
48 | 48 | import java.util.UUID; |
| 49 | +import java.util.concurrent.atomic.AtomicReference; |
49 | 50 | import java.util.concurrent.TimeUnit; |
50 | 51 |
|
51 | 52 | import org.apache.commons.io.FileUtils; |
@@ -190,10 +191,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { |
190 | 191 | * HBaseTestingUtility*/ |
191 | 192 | private Path dataTestDirOnTestFS = null; |
192 | 193 |
|
193 | | - /** |
194 | | - * Shared cluster connection. |
195 | | - */ |
196 | | - private volatile Connection connection; |
| 194 | + private final AtomicReference<Connection> connectionRef = new AtomicReference<>(); |
197 | 195 |
|
198 | 196 | /** |
199 | 197 | * System property key to get test directory value. |
@@ -1170,10 +1168,6 @@ public MiniHBaseCluster getMiniHBaseCluster() { |
1170 | 1168 | */ |
1171 | 1169 | public void shutdownMiniCluster() throws Exception { |
1172 | 1170 | LOG.info("Shutting down minicluster"); |
1173 | | - if (this.connection != null && !this.connection.isClosed()) { |
1174 | | - this.connection.close(); |
1175 | | - this.connection = null; |
1176 | | - } |
1177 | 1171 | shutdownMiniHBaseCluster(); |
1178 | 1172 | if (!this.passedZkCluster){ |
1179 | 1173 | shutdownMiniZKCluster(); |
@@ -1203,10 +1197,7 @@ public boolean cleanupTestDir() { |
1203 | 1197 | * @throws IOException |
1204 | 1198 | */ |
1205 | 1199 | public void shutdownMiniHBaseCluster() throws IOException { |
1206 | | - if (hbaseAdmin != null) { |
1207 | | - hbaseAdmin.close0(); |
1208 | | - hbaseAdmin = null; |
1209 | | - } |
| 1200 | + closeConnection(); |
1210 | 1201 |
|
1211 | 1202 | // unset the configuration for MIN and MAX RS to start |
1212 | 1203 | conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1); |
@@ -3020,16 +3011,26 @@ public HBaseCluster getHBaseClusterInterface() { |
3020 | 3011 | } |
3021 | 3012 |
|
3022 | 3013 | /** |
3023 | | - * Get a Connection to the cluster. |
3024 | | - * Not thread-safe (This class needs a lot of work to make it thread-safe). |
| 3014 | + * Get a shared Connection to the cluster. |
| 3015 | + * this method is threadsafe. |
3025 | 3016 | * @return A Connection that can be shared. Don't close. Will be closed on shutdown of cluster. |
3026 | 3017 | * @throws IOException |
3027 | 3018 | */ |
3028 | 3019 | public Connection getConnection() throws IOException { |
3029 | | - if (this.connection == null) { |
3030 | | - this.connection = ConnectionFactory.createConnection(this.conf); |
| 3020 | + Connection connection = this.connectionRef.get(); |
| 3021 | + while (connection == null) { |
| 3022 | + connection = ConnectionFactory.createConnection(this.conf); |
| 3023 | + if (! this.connectionRef.compareAndSet(null, connection)) { |
| 3024 | + try { |
| 3025 | + connection.close(); |
| 3026 | + } catch (IOException exception) { |
| 3027 | + LOG.debug("Ignored failure while closing connection on contended connection creation.", |
| 3028 | + exception); |
| 3029 | + } |
| 3030 | + connection = this.connectionRef.get(); |
| 3031 | + } |
3031 | 3032 | } |
3032 | | - return this.connection; |
| 3033 | + return connection; |
3033 | 3034 | } |
3034 | 3035 |
|
3035 | 3036 | /** |
@@ -3067,6 +3068,25 @@ private synchronized void close0() throws IOException { |
3067 | 3068 | } |
3068 | 3069 | } |
3069 | 3070 |
|
| 3071 | + public void closeConnection() throws IOException { |
| 3072 | + if (hbaseAdmin != null) { |
| 3073 | + try { |
| 3074 | + hbaseAdmin.close0(); |
| 3075 | + } catch (IOException exception) { |
| 3076 | + LOG.debug("Ignored failure while closing admin.", exception); |
| 3077 | + } |
| 3078 | + hbaseAdmin = null; |
| 3079 | + } |
| 3080 | + Connection connection = this.connectionRef.getAndSet(null); |
| 3081 | + if (connection != null) { |
| 3082 | + try { |
| 3083 | + connection.close(); |
| 3084 | + } catch (IOException exception) { |
| 3085 | + LOG.debug("Ignored failure while closing connection.", exception); |
| 3086 | + } |
| 3087 | + } |
| 3088 | + } |
| 3089 | + |
3070 | 3090 | /** |
3071 | 3091 | * Returns a ZooKeeperWatcher instance. |
3072 | 3092 | * This instance is shared between HBaseTestingUtility instance users. |
@@ -3240,7 +3260,7 @@ public String explainTableAvailability(TableName tableName) throws IOException { |
3240 | 3260 | .getRegionAssignments(); |
3241 | 3261 | final List<Pair<HRegionInfo, ServerName>> metaLocations = |
3242 | 3262 | MetaTableAccessor |
3243 | | - .getTableRegionsAndLocations(getZooKeeperWatcher(), connection, tableName); |
| 3263 | + .getTableRegionsAndLocations(getZooKeeperWatcher(), getConnection(), tableName); |
3244 | 3264 | for (Pair<HRegionInfo, ServerName> metaLocation : metaLocations) { |
3245 | 3265 | HRegionInfo hri = metaLocation.getFirst(); |
3246 | 3266 | ServerName sn = metaLocation.getSecond(); |
|
0 commit comments