Skip to content
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
Expand All @@ -48,8 +49,6 @@ abstract class SocketIOWithTimeout {
private long timeout;
private boolean closed = false;

private static SelectorPool selector = new SelectorPool();

/* A timeout value of 0 implies wait for ever.
* We should have a value of timeout that implies zero wait.. i.e.
* read or write returns immediately.
Expand Down Expand Up @@ -154,7 +153,7 @@ int doIO(ByteBuffer buf, int ops) throws IOException {
//now wait for socket to be ready.
int count = 0;
try {
count = selector.select(channel, ops, timeout);
count = SelectorPool.select(channel, ops, timeout);
} catch (IOException e) { //unexpected IOException.
closed = true;
throw e;
Expand Down Expand Up @@ -200,7 +199,7 @@ static void connect(SocketChannel channel,
// we might have to call finishConnect() more than once
// for some channels (with user level protocols)

int ret = selector.select((SelectableChannel)channel,
int ret = SelectorPool.select((SelectableChannel)channel,
SelectionKey.OP_CONNECT, timeoutLeft);

if (ret > 0 && channel.finishConnect()) {
Expand Down Expand Up @@ -242,7 +241,7 @@ static void connect(SocketChannel channel,
*/
void waitForIO(int ops) throws IOException {

if (selector.select(channel, ops, timeout) == 0) {
if (SelectorPool.select(channel, ops, timeout) == 0) {
throw new SocketTimeoutException(timeoutExceptionString(channel, timeout,
ops));
}
Expand Down Expand Up @@ -283,9 +282,14 @@ private static String timeoutExceptionString(SelectableChannel channel,
private static class SelectorPool {

private static class SelectorInfo {
Selector selector;
long lastActivityTime;
LinkedList<SelectorInfo> queue;
private SelectorProvider provider;
private Selector selector;
private long lastActivityTime;

private SelectorInfo(SelectorProvider provider, Selector selector) {
this.provider = provider;
this.selector = selector;
}

void close() {
if (selector != null) {
Expand All @@ -298,16 +302,11 @@ void close() {
}
}

private static class ProviderInfo {
SelectorProvider provider;
LinkedList<SelectorInfo> queue; // lifo
ProviderInfo next;
}
private static ConcurrentHashMap<SelectorProvider, ConcurrentLinkedDeque<SelectorInfo>>
providerMap = new ConcurrentHashMap<>();

private static final long IDLE_TIMEOUT = 10 * 1000; // 10 seconds.

private ProviderInfo providerList = null;

/**
* Waits on the channel with the given timeout using one of the
* cached selectors. It also removes any cached selectors that are
Expand All @@ -319,7 +318,7 @@ private static class ProviderInfo {
* @return
* @throws IOException
*/
int select(SelectableChannel channel, int ops, long timeout)
static int select(SelectableChannel channel, int ops, long timeout)
throws IOException {

SelectorInfo info = get(channel);
Expand Down Expand Up @@ -385,35 +384,17 @@ int select(SelectableChannel channel, int ops, long timeout)
* @return
* @throws IOException
*/
private synchronized SelectorInfo get(SelectableChannel channel)
throws IOException {
SelectorInfo selInfo = null;

private static SelectorInfo get(SelectableChannel channel) throws IOException {
SelectorProvider provider = channel.provider();

// pick the list : rarely there is more than one provider in use.
ProviderInfo pList = providerList;
while (pList != null && pList.provider != provider) {
pList = pList.next;
}
if (pList == null) {
//LOG.info("Creating new ProviderInfo : " + provider.toString());
pList = new ProviderInfo();
pList.provider = provider;
pList.queue = new LinkedList<SelectorInfo>();
pList.next = providerList;
providerList = pList;
}

LinkedList<SelectorInfo> queue = pList.queue;

if (queue.isEmpty()) {
ConcurrentLinkedDeque<SelectorInfo> selectorQ = providerMap.computeIfAbsent(
provider, k -> new ConcurrentLinkedDeque<>());

SelectorInfo selInfo = selectorQ.pollLast(); // last in first out
if (selInfo == null) {
Selector selector = provider.openSelector();
selInfo = new SelectorInfo();
selInfo.selector = selector;
selInfo.queue = queue;
} else {
selInfo = queue.removeLast();
// selInfo will be put into selectorQ after `#release()`
selInfo = new SelectorInfo(provider, selector);
}

trimIdleSelectors(Time.now());
Expand All @@ -426,34 +407,44 @@ private synchronized SelectorInfo get(SelectableChannel channel)
*
* @param info
*/
private synchronized void release(SelectorInfo info) {
private static void release(SelectorInfo info) {
long now = Time.now();
trimIdleSelectors(now);
info.lastActivityTime = now;
info.queue.addLast(info);
// SelectorInfos in queue are sorted by lastActivityTime
providerMap.get(info.provider).addLast(info);
}

private static AtomicBoolean trimming = new AtomicBoolean(false);
private static volatile long lastTrimTime = Time.now();

/**
* Closes selectors that are idle for IDLE_TIMEOUT (10 sec). It does not
* traverse the whole list, just over the one that have crossed
* the timeout.
*/
private void trimIdleSelectors(long now) {
private static void trimIdleSelectors(long now) {
if (!trimming.compareAndSet(false, true)) {
return;
}
if (now - lastTrimTime < IDLE_TIMEOUT / 2) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it IDLE_TIMEOUT/2 here?
Maybe idle time is near 1.5 times IDLE_TIMEOUT in extreme cases。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ferhui I remove this check in new commit.

trimming.set(false);
return;
}

long cutoff = now - IDLE_TIMEOUT;

for(ProviderInfo pList=providerList; pList != null; pList=pList.next) {
if (pList.queue.isEmpty()) {
continue;
}
for(Iterator<SelectorInfo> it = pList.queue.iterator(); it.hasNext();) {
SelectorInfo info = it.next();
if (info.lastActivityTime > cutoff) {
for (ConcurrentLinkedDeque<SelectorInfo> selectorQ: providerMap.values()) {
SelectorInfo oldest;
while ((oldest = selectorQ.peekFirst()) != null) {
if (oldest.lastActivityTime <= cutoff && selectorQ.remove(oldest)) {
oldest.close();
} else {
break;
}
it.remove();
info.close();
}
}
lastTrimTime = now;
trimming.set(false);
}
}
}