Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions core/src/main/java/io/grpc/internal/InUseStateAggregator.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import java.util.HashSet;

import javax.annotation.CheckReturnValue;
import javax.annotation.concurrent.GuardedBy;

/**
Expand All @@ -44,15 +45,18 @@ abstract class InUseStateAggregator<T> {
private final HashSet<T> inUseObjects = new HashSet<T>();

/**
* Update the in-use state of an object. Initially no object is in use.
* Update the in-use state of an object. Initially no object is in use. When the return value is
* non-{@code null}, the caller should execute the runnable after releasing locks.
*/
final void updateObjectInUse(T object, boolean inUse) {
@CheckReturnValue
final Runnable updateObjectInUse(T object, boolean inUse) {
Runnable runnable = null;
synchronized (getLock()) {
int origSize = inUseObjects.size();
if (inUse) {
inUseObjects.add(object);
if (origSize == 0) {
handleInUse();
runnable = handleInUse();
}
} else {
boolean removed = inUseObjects.remove(object);
Expand All @@ -61,8 +65,10 @@ final void updateObjectInUse(T object, boolean inUse) {
}
}
}
return runnable;
}

@CheckReturnValue
final boolean isInUse() {
synchronized (getLock()) {
return !inUseObjects.isEmpty();
Expand All @@ -73,12 +79,13 @@ final boolean isInUse() {

/**
* Called when the aggregated in-use state has changed to true, which means at least one object is
* in use.
* in use. When the return value is non-{@code null}, then the runnable will be executed by the
* caller of {@link #updateObjectInUse} after releasing locks.
*
* <p>This method is called under the lock returned by {@link #getLock}.
*/
@GuardedBy("getLock()")
abstract void handleInUse();
abstract Runnable handleInUse();

/**
* Called when the aggregated in-use state has changed to false, which means no object is in use.
Expand Down
80 changes: 49 additions & 31 deletions core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ Object getLock() {

@Override
@GuardedBy("lock")
void handleInUse() {
exitIdleMode();
Runnable handleInUse() {
return exitIdleMode();
}

@GuardedBy("lock")
Expand Down Expand Up @@ -220,32 +220,46 @@ public void run() {
/**
* Make the channel exit idle mode, if it's in it. Return a LoadBalancer that can be used for
* making new requests. Return null if the channel is shutdown.
*
* <p>May be called under the lock.
*/
@VisibleForTesting
LoadBalancer<ClientTransport> exitIdleMode() {
LoadBalancer<ClientTransport> exitIdleModeAndGetLb() {
Runnable runnable;
LoadBalancer<ClientTransport> balancer;
synchronized (lock) {
runnable = exitIdleMode();
balancer = loadBalancer;
}
if (runnable != null) {
runnable.run();
}
return balancer;
}

/**
* Make the channel exit idle mode, if it's in it. If the returned runnable is non-{@code null},
* then it should be executed by the caller after releasing {@code lock}.
*/
@GuardedBy("lock")
private Runnable exitIdleMode() {
final LoadBalancer<ClientTransport> balancer;
final NameResolver resolver;
synchronized (lock) {
if (shutdown) {
return null;
}
if (inUseStateAggregator.isInUse()) {
cancelIdleTimer();
} else {
// exitIdleMode() may be called outside of inUseStateAggregator, which may still in
// "not-in-use" state. If it's the case, we start the timer which will be soon cancelled if
// the aggregator receives actual uses.
rescheduleIdleTimer();
}
if (loadBalancer != null) {
return loadBalancer;
}
balancer = loadBalancerFactory.newLoadBalancer(nameResolver.getServiceAuthority(), tm);
this.loadBalancer = balancer;
resolver = this.nameResolver;
if (shutdown) {
return null;
}
if (inUseStateAggregator.isInUse()) {
cancelIdleTimer();
} else {
// exitIdleMode() may be called outside of inUseStateAggregator, which may still in
// "not-in-use" state. If it's the case, we start the timer which will be soon cancelled if
// the aggregator receives actual uses.
rescheduleIdleTimer();
}
if (loadBalancer != null) {
return null;
}
balancer = loadBalancerFactory.newLoadBalancer(nameResolver.getServiceAuthority(), tm);
this.loadBalancer = balancer;
resolver = this.nameResolver;
class NameResolverStartTask implements Runnable {
@Override
public void run() {
Expand All @@ -255,8 +269,7 @@ public void run() {
}
}

scheduledExecutor.execute(new NameResolverStartTask());
return balancer;
return new NameResolverStartTask();
}

@GuardedBy("lock")
Expand Down Expand Up @@ -294,7 +307,7 @@ private void rescheduleIdleTimer() {
private final ClientTransportProvider transportProvider = new ClientTransportProvider() {
@Override
public ClientTransport get(CallOptions callOptions) {
LoadBalancer<ClientTransport> balancer = exitIdleMode();
LoadBalancer<ClientTransport> balancer = exitIdleModeAndGetLb();
if (balancer == null) {
return SHUTDOWN_TRANSPORT;
}
Expand Down Expand Up @@ -618,13 +631,14 @@ public void onConnectionClosedByServer(Status status) {
}

@Override
public void onInUse(TransportSet ts) {
inUseStateAggregator.updateObjectInUse(ts, true);
public Runnable onInUse(TransportSet ts) {
return inUseStateAggregator.updateObjectInUse(ts, true);
}

@Override
public void onNotInUse(TransportSet ts) {
inUseStateAggregator.updateObjectInUse(ts, false);
Runnable r = inUseStateAggregator.updateObjectInUse(ts, false);
assert r == null;
}
});
if (log.isLoggable(Level.FINE)) {
Expand Down Expand Up @@ -709,13 +723,17 @@ private class InterimTransportImpl implements InterimTransport<ClientTransport>
delayedTransports.remove(delayedTransport);
maybeTerminateChannel();
}
inUseStateAggregator.updateObjectInUse(delayedTransport, false);
Runnable r = inUseStateAggregator.updateObjectInUse(delayedTransport, false);
assert r == null;
}

@Override public void transportReady() {}

@Override public void transportInUse(boolean inUse) {
inUseStateAggregator.updateObjectInUse(delayedTransport, inUse);
Runnable r = inUseStateAggregator.updateObjectInUse(delayedTransport, inUse);
if (r != null) {
r.run();
}
}
});
boolean savedShutdown;
Expand Down
20 changes: 14 additions & 6 deletions core/src/main/java/io/grpc/internal/TransportSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ Object getLock() {
}

@Override
void handleInUse() {
callback.onInUse(TransportSet.this);
Runnable handleInUse() {
return callback.onInUse(TransportSet.this);
}

@Override
Expand Down Expand Up @@ -370,7 +370,10 @@ public void transportReady() {}

@Override
public void transportInUse(boolean inUse) {
inUseStateAggregator.updateObjectInUse(transport, inUse);
Runnable r = inUseStateAggregator.updateObjectInUse(transport, inUse);
if (r != null) {
r.run();
}
}

@Override
Expand All @@ -379,7 +382,8 @@ public void transportShutdown(Status status) {}
@Override
public void transportTerminated() {
boolean runCallback = false;
inUseStateAggregator.updateObjectInUse(transport, false);
Runnable r = inUseStateAggregator.updateObjectInUse(transport, false);
assert r == null;
synchronized (lock) {
transports.remove(transport);
if (shutdown && transports.isEmpty()) {
Expand Down Expand Up @@ -519,9 +523,13 @@ public void onConnectionClosedByServer(Status status) { }

/**
* Called when the TransportSet's in-use state has changed to true, which means at least one
* transport is in use. This method is called under a lock thus externally synchronized.
* transport is in use. This method is called under a lock thus externally synchronized. If the
* return value is non-{@code null}, the runnable will be executed after releasing the lock.
*/
public void onInUse(TransportSet ts) { }
@CheckReturnValue
public Runnable onInUse(TransportSet ts) {
return null;
}

/**
* Called when the TransportSet's in-use state has changed to false, which means no transport is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,9 +338,7 @@ private void walkIntoIdleMode(Collection<MockClientTransportInfo> currentTranspo
}

private void forceExitIdleMode() {
channel.exitIdleMode();
// NameResolver is started in the scheduled executor
timer.runDueTasks();
channel.exitIdleModeAndGetLb();
}

private ClientTransport channelTmGetTransportUnwrapped(EquivalentAddressGroup addressGroup) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,7 @@ private void createChannel(
ManagedChannelImpl.IDLE_TIMEOUT_MILLIS_DISABLE,
executor.scheduledExecutorService, userAgent, interceptors);
// Force-exit the initial idle-mode
channel.exitIdleMode();
// Will start NameResolver in the scheduled executor
assertEquals(1, timer.runDueTasks());
channel.exitIdleModeAndGetLb();
}

@Before
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public void setUp() {
ArgumentCaptor<TransportManager<ClientTransport>> tmCaptor
= ArgumentCaptor.forClass(null);
// Force Channel to exit the initial idleness to get NameResolver and LoadBalancer created.
channel.exitIdleMode();
channel.exitIdleModeAndGetLb();
verify(mockNameResolverFactory).newNameResolver(any(URI.class), any(Attributes.class));
verify(mockLoadBalancerFactory).newLoadBalancer(anyString(), tmCaptor.capture());
tm = tmCaptor.getValue();
Expand Down