Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.SocketAddress;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -159,7 +158,7 @@ public void close() {
}
ConnectionOverAsyncConnection c = this.conn;
if (c != null) {
c.closeConnImpl();
c.closePool();
}
closed = true;
}
Expand Down Expand Up @@ -305,14 +304,7 @@ public Connection toConnection() {
if (c != null) {
return c;
}
try {
c = new ConnectionOverAsyncConnection(this,
ConnectionFactory.createConnectionImpl(conf, null, user));
} catch (IOException e) {
// TODO: finally we will not rely on ConnectionImplementation anymore and there will no
// IOException here.
throw new UncheckedIOException(e);
}
c = new ConnectionOverAsyncConnection(this);
this.conn = c;
}
return c;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,20 +43,12 @@ class ConnectionOverAsyncConnection implements Connection {

private volatile ExecutorService batchPool = null;

protected final AsyncConnectionImpl conn;

/**
* @deprecated we can not implement all the related stuffs at once so keep it here for now, will
* remove it after we implement all the stuffs, like Admin, RegionLocator, etc.
*/
@Deprecated
private final ConnectionImplementation oldConn;
private final AsyncConnectionImpl conn;

private final ConnectionConfiguration connConf;

ConnectionOverAsyncConnection(AsyncConnectionImpl conn, ConnectionImplementation oldConn) {
ConnectionOverAsyncConnection(AsyncConnectionImpl conn) {
this.conn = conn;
this.oldConn = oldConn;
this.connConf = new ConnectionConfiguration(conn.getConfiguration());
}

Expand Down Expand Up @@ -109,7 +101,7 @@ public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws I

@Override
public RegionLocator getRegionLocator(TableName tableName) throws IOException {
return oldConn.getRegionLocator(tableName);
return new RegionLocatorOverAsyncTableRegionLocator(conn.getRegionLocator(tableName));
}

@Override
Expand All @@ -129,7 +121,7 @@ public void close() throws IOException {

// will be called from AsyncConnection, to avoid infinite loop as in the above method we will call
// AsyncConnection.close.
void closeConnImpl() {
void closePool() {
ExecutorService batchPool = this.batchPool;
if (batchPool != null) {
ConnectionUtils.shutdownPool(batchPool);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;

import static org.apache.hadoop.hbase.util.FutureUtils.get;

import java.io.IOException;
import java.util.List;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;

/**
* The {@link RegionLocator} implementation based on {@link AsyncTableRegionLocator}.
*/
@InterfaceAudience.Private
class RegionLocatorOverAsyncTableRegionLocator implements RegionLocator {

private final AsyncTableRegionLocator locator;

RegionLocatorOverAsyncTableRegionLocator(AsyncTableRegionLocator locator) {
this.locator = locator;
}

@Override
public void close() {
}

@Override
public HRegionLocation getRegionLocation(byte[] row, int replicaId, boolean reload)
throws IOException {
return get(locator.getRegionLocation(row, replicaId, reload));
}

@Override
public List<HRegionLocation> getRegionLocations(byte[] row, boolean reload) throws IOException {
return get(locator.getRegionLocations(row, reload));
}

@Override
public void clearRegionLocationCache() {
locator.clearRegionLocationCache();
}

@Override
public List<HRegionLocation> getAllRegionLocations() throws IOException {
return get(locator.getAllRegionLocations());
}

@Override
public TableName getName() {
return locator.getName();
}

}