Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 31 additions & 33 deletions rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ final class CachingRlsLbClient {
private final RefCountedChildPolicyWrapperFactory refCountedChildPolicyWrapperFactory;
private final ChannelLogger logger;
private final ChildPolicyWrapper fallbackChildPolicyWrapper;
private ConnectivityState lastRlsServerConnectivityState;
private boolean wasReady;
private boolean wasInTransientFailure;

static {
MetricInstrumentRegistry metricInstrumentRegistry
Expand Down Expand Up @@ -216,6 +219,9 @@ private CachingRlsLbClient(Builder builder) {
rlsChannelBuilder.disableServiceConfigLookUp();
}
rlsChannel = rlsChannelBuilder.build();
lastRlsServerConnectivityState = rlsChannel.getState(false);
rlsChannel.notifyWhenStateChanged(
lastRlsServerConnectivityState, () -> rlsServerConnectionStateChanged());
rlsStub = RouteLookupServiceGrpc.newStub(rlsChannel);
childLbResolvedAddressFactory =
checkNotNull(builder.resolvedAddressFactory, "resolvedAddressFactory");
Expand All @@ -225,8 +231,7 @@ private CachingRlsLbClient(Builder builder) {
refCountedChildPolicyWrapperFactory =
new RefCountedChildPolicyWrapperFactory(
lbPolicyConfig.getLoadBalancingPolicy(), childLbResolvedAddressFactory,
childLbHelperProvider,
new BackoffRefreshListener());
childLbHelperProvider);
// TODO(creamsoup) wait until lb is ready
String defaultTarget = lbPolicyConfig.getRouteLookupConfig().defaultTarget();
if (defaultTarget != null && !defaultTarget.isEmpty()) {
Expand Down Expand Up @@ -257,6 +262,26 @@ public void accept(BatchRecorder recorder) {
logger.log(ChannelLogLevel.DEBUG, "CachingRlsLbClient created");
}

private void rlsServerConnectionStateChanged() {
ConnectivityState currentState = rlsChannel.getState(false);
if (!wasInTransientFailure && currentState == ConnectivityState.READY) {
wasReady = true;
} else if (wasReady && currentState == ConnectivityState.TRANSIENT_FAILURE) {
wasInTransientFailure = true;
} else if (wasInTransientFailure && currentState == ConnectivityState.READY) {
wasInTransientFailure = false;
synchronized (lock) {
for (CacheEntry value : linkedHashLruCache.values()) {
if (value instanceof BackoffCacheEntry) {
refreshBackoffEntry((BackoffCacheEntry) value);
}
}
}
}
rlsChannel.notifyWhenStateChanged(currentState, () -> rlsServerConnectionStateChanged());
lastRlsServerConnectivityState = currentState;
}

void init() {
synchronized (lock) {
refCountedChildPolicyWrapperFactory.init();
Expand Down Expand Up @@ -439,7 +464,7 @@ private BackoffCacheEntry createBackOffEntry(
ChannelLogLevel.DEBUG,
"[RLS Entry {0}] Transition to back off: status={1}, delayNanos={2}",
request, status, delayNanos);
BackoffCacheEntry entry = new BackoffCacheEntry(request, status, backoffPolicy);
BackoffCacheEntry entry = new BackoffCacheEntry(request, status);
// Lock is held, so the task can't execute before the assignment
entry.scheduledFuture = scheduledExecutorService.schedule(
() -> refreshBackoffEntry(entry), delayNanos, TimeUnit.NANOSECONDS);
Expand All @@ -457,7 +482,8 @@ private void refreshBackoffEntry(BackoffCacheEntry entry) {
logger.log(ChannelLogLevel.DEBUG,
"[RLS Entry {0}] Calling RLS for transition to pending", entry.request);
linkedHashLruCache.invalidate(entry.request);
asyncRlsCall(entry.request, entry.backoffPolicy);
// Cache updated. updateBalancingState() to reattempt picks
helper.triggerPendingRpcProcessing();
}
}

Expand Down Expand Up @@ -761,13 +787,11 @@ public String toString() {
private static final class BackoffCacheEntry extends CacheEntry {

private final Status status;
private final BackoffPolicy backoffPolicy;
private Future<?> scheduledFuture;

BackoffCacheEntry(RouteLookupRequest request, Status status, BackoffPolicy backoffPolicy) {
BackoffCacheEntry(RouteLookupRequest request, Status status) {
super(request);
this.status = checkNotNull(status, "status");
this.backoffPolicy = checkNotNull(backoffPolicy, "backoffPolicy");
}

Status getStatus() {
Expand Down Expand Up @@ -944,32 +968,6 @@ public CacheEntry cacheAndClean(RouteLookupRequest key, CacheEntry value) {
}
}

/**
* LbStatusListener refreshes {@link BackoffCacheEntry} when lb state is changed to {@link
* ConnectivityState#READY} from {@link ConnectivityState#TRANSIENT_FAILURE}.
*/
private final class BackoffRefreshListener implements ChildLbStatusListener {

@Nullable
private ConnectivityState prevState = null;

@Override
public void onStatusChanged(ConnectivityState newState) {
if (prevState == ConnectivityState.TRANSIENT_FAILURE
&& newState == ConnectivityState.READY) {
logger.log(ChannelLogLevel.DEBUG, "Transitioning from TRANSIENT_FAILURE to READY");
synchronized (lock) {
for (CacheEntry value : linkedHashLruCache.values()) {
if (value instanceof BackoffCacheEntry) {
refreshBackoffEntry((BackoffCacheEntry) value);
}
}
}
}
prevState = newState;
}
}

/** A header will be added when RLS server respond with additional header data. */
@VisibleForTesting
static final Metadata.Key<String> RLS_DATA_KEY =
Expand Down
20 changes: 5 additions & 15 deletions rls/src/main/java/io/grpc/rls/LbPolicyConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -210,20 +210,17 @@ static final class RefCountedChildPolicyWrapperFactory {
new HashMap<>();

private final ChildLoadBalancerHelperProvider childLbHelperProvider;
private final ChildLbStatusListener childLbStatusListener;
private final ChildLoadBalancingPolicy childPolicy;
private ResolvedAddressFactory childLbResolvedAddressFactory;

public RefCountedChildPolicyWrapperFactory(
ChildLoadBalancingPolicy childPolicy,
ResolvedAddressFactory childLbResolvedAddressFactory,
ChildLoadBalancerHelperProvider childLbHelperProvider,
ChildLbStatusListener childLbStatusListener) {
ChildLoadBalancerHelperProvider childLbHelperProvider) {
this.childPolicy = checkNotNull(childPolicy, "childPolicy");
this.childLbResolvedAddressFactory =
checkNotNull(childLbResolvedAddressFactory, "childLbResolvedAddressFactory");
this.childLbHelperProvider = checkNotNull(childLbHelperProvider, "childLbHelperProvider");
this.childLbStatusListener = checkNotNull(childLbStatusListener, "childLbStatusListener");
}

void init() {
Expand All @@ -248,8 +245,7 @@ ChildPolicyWrapper createOrGet(String target) {
RefCountedChildPolicyWrapper pooledChildPolicyWrapper = childPolicyMap.get(target);
if (pooledChildPolicyWrapper == null) {
ChildPolicyWrapper childPolicyWrapper = new ChildPolicyWrapper(
target, childPolicy, childLbResolvedAddressFactory, childLbHelperProvider,
childLbStatusListener);
target, childPolicy, childLbResolvedAddressFactory, childLbHelperProvider);
pooledChildPolicyWrapper = RefCountedChildPolicyWrapper.of(childPolicyWrapper);
childPolicyMap.put(target, pooledChildPolicyWrapper);
return pooledChildPolicyWrapper.getObject();
Expand Down Expand Up @@ -299,11 +295,9 @@ public ChildPolicyWrapper(
String target,
ChildLoadBalancingPolicy childPolicy,
final ResolvedAddressFactory childLbResolvedAddressFactory,
ChildLoadBalancerHelperProvider childLbHelperProvider,
ChildLbStatusListener childLbStatusListener) {
ChildLoadBalancerHelperProvider childLbHelperProvider) {
this.target = target;
this.helper =
new ChildPolicyReportingHelper(childLbHelperProvider, childLbStatusListener);
this.helper = new ChildPolicyReportingHelper(childLbHelperProvider);
LoadBalancerProvider lbProvider = childPolicy.getEffectiveLbProvider();
final ConfigOrError lbConfig =
lbProvider
Expand Down Expand Up @@ -386,14 +380,11 @@ public String toString() {
final class ChildPolicyReportingHelper extends ForwardingLoadBalancerHelper {

private final ChildLoadBalancerHelper delegate;
private final ChildLbStatusListener listener;

ChildPolicyReportingHelper(
ChildLoadBalancerHelperProvider childHelperProvider,
ChildLbStatusListener listener) {
ChildLoadBalancerHelperProvider childHelperProvider) {
checkNotNull(childHelperProvider, "childHelperProvider");
this.delegate = childHelperProvider.forTarget(getTarget());
this.listener = checkNotNull(listener, "listener");
}

@Override
Expand All @@ -406,7 +397,6 @@ public void updateBalancingState(ConnectivityState newState, SubchannelPicker ne
picker = newPicker;
state = newState;
super.updateBalancingState(newState, newPicker);
listener.onStatusChanged(newState);
}
}
}
Expand Down
124 changes: 103 additions & 21 deletions rls/src/test/java/io/grpc/rls/CachingRlsLbClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,18 @@
import io.grpc.MetricRecorder.BatchRecorder;
import io.grpc.MetricRecorder.Registration;
import io.grpc.NameResolver.ConfigOrError;
import io.grpc.Server;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.SynchronizationContext;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.internal.BackoffPolicy;
import io.grpc.internal.FakeClock;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.ObjectPool;
import io.grpc.internal.PickSubchannelArgsImpl;
import io.grpc.internal.SharedResourcePool;
import io.grpc.lookup.v1.RouteLookupServiceGrpc;
import io.grpc.rls.CachingRlsLbClient.CacheEntry;
import io.grpc.rls.CachingRlsLbClient.CachedRouteLookupResponse;
Expand Down Expand Up @@ -96,10 +100,13 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nonnull;
import org.junit.After;
import org.junit.Before;
Expand Down Expand Up @@ -160,8 +167,9 @@ public void uncaughtException(Thread t, Throwable e) {
fakeClock.getScheduledExecutorService());
private final ChildLoadBalancingPolicy childLbPolicy =
new ChildLoadBalancingPolicy("target", Collections.<String, Object>emptyMap(), lbProvider);
private final FakeHelper fakeHelper = new FakeHelper();
private final Helper helper =
mock(Helper.class, delegatesTo(new FakeHelper()));
mock(Helper.class, delegatesTo(fakeHelper));
private final FakeThrottler fakeThrottler = new FakeThrottler();
private final LbPolicyConfiguration lbPolicyConfiguration =
new LbPolicyConfiguration(ROUTE_LOOKUP_CONFIG, null, childLbPolicy);
Expand Down Expand Up @@ -325,29 +333,94 @@ public void get_throttledAndRecover() throws Exception {

assertThat(resp.hasError()).isTrue();

// let it pass throttler
fakeThrottler.nextResult = false;
fakeClock.forwardTime(10, TimeUnit.MILLISECONDS);
// initially backed off entry is backed off again
// Backoff entry evicted from cache.
verify(evictionListener)
.onEviction(eq(routeLookupRequest), any(CacheEntry.class), eq(EvictionType.EXPLICIT));
// Assert that Rls LB policy picker was updated.
assertThat(fakeHelper.lastPicker.toString()).isEqualTo("RlsPicker{target=service1}");
}

resp = getInSyncContext(routeLookupRequest);

assertThat(resp.hasError()).isTrue();

// let it pass throttler
fakeThrottler.nextResult = false;
fakeClock.forwardTime(10, TimeUnit.MILLISECONDS);
@Test
public void controlPlaneTransientToReady_backOffEntriesRemovedAndPickerUpdated()
throws Exception {
setUpRlsLbClient();
final ConnectivityState[] rlsChannelState = new ConnectivityState[1];
Runnable channelStateListener = new Runnable() {
@Override
public void run() {
rlsChannelState[0] = fakeHelper.oobChannel.getState(false);
fakeHelper.oobChannel.notifyWhenStateChanged(rlsChannelState[0], this);
synchronized (this) {
notify();
}
}
};
fakeHelper.oobChannel.notifyWhenStateChanged(fakeHelper.oobChannel.getState(false),
channelStateListener);
RouteLookupRequest routeLookupRequest = RouteLookupRequest.create(ImmutableMap.of(
"server", "bigtable.googleapis.com", "service-key", "foo", "method-key", "bar"));
rlsServerImpl.setLookupTable(
ImmutableMap.of(
routeLookupRequest,
RouteLookupResponse.create(ImmutableList.of("target"), "header")));

CachedRouteLookupResponse resp = getInSyncContext(routeLookupRequest);
assertThat(resp.isPending()).isTrue();
// server response
fakeClock.forwardTime(SERVER_LATENCY_MILLIS, TimeUnit.MILLISECONDS);
resp = getInSyncContext(routeLookupRequest);
assertThat(resp.hasData()).isTrue();

fakeHelper.server.shutdown();
// Channel goes to IDLE state from the shutdown listener handling.
try {
if (!fakeHelper.server.awaitTermination(10, TimeUnit.SECONDS)) {
fakeHelper.server.shutdownNow(); // Forceful shutdown if graceful timeout expires
}
} catch (InterruptedException e) {
fakeHelper.server.shutdownNow();
}
// Use a different key to cause a cache miss and trigger a RPC.
RouteLookupRequest routeLookupRequest2 = RouteLookupRequest.create(ImmutableMap.of(
"server", "bigtable.googleapis.com", "service-key", "foo2", "method-key", "bar"));
// Rls channel will go to TRANSIENT_FAILURE (back-off) because the picker notices the
// subchannel state IDLE and the server transport listener is null.
resp = getInSyncContext(routeLookupRequest2);
assertThat(resp.isPending()).isTrue();
assertThat(rlsChannelState[0]).isEqualTo(ConnectivityState.TRANSIENT_FAILURE);
// Throttle the next rpc call.
fakeThrottler.nextResult = true;
fakeBackoffProvider.nextPolicy = createBackoffPolicy(10, TimeUnit.MILLISECONDS);

// server responses
fakeClock.forwardTime(SERVER_LATENCY_MILLIS, TimeUnit.MILLISECONDS);
// Cause another cache miss by using a new request key. This will create a back-off Rls
// cache entry.
RouteLookupRequest routeLookupRequest3 = RouteLookupRequest.create(ImmutableMap.of(
"server", "bigtable.googleapis.com", "service-key", "foo3", "method-key", "bar"));
resp = getInSyncContext(routeLookupRequest3);

resp = getInSyncContext(routeLookupRequest);
assertThat(resp.hasError()).isTrue();

assertThat(resp.hasData()).isTrue();
fakeHelper.createServerAndRegister("service1");
// Wait for Rls subchannel back-off expiry and its moving to READY
synchronized (channelStateListener) {
channelStateListener.wait(5000);
}
assertThat(rlsChannelState[0]).isEqualTo(ConnectivityState.READY);
final ObjectPool<? extends Executor> defaultExecutorPool =
SharedResourcePool.forResource(GrpcUtil.SHARED_CHANNEL_EXECUTOR);
AtomicBoolean isSuccess = new AtomicBoolean(false);
((ExecutorService) defaultExecutorPool.getObject()).submit(() -> {
// Backoff entry evicted from cache.
verify(evictionListener)
.onEviction(eq(routeLookupRequest3), any(CacheEntry.class), eq(EvictionType.EXPLICIT));
// Assert that Rls LB policy picker was updated.
assertThat(fakeHelper.lastPicker.toString()).isEqualTo("RlsPicker{target=service1}");
isSuccess.set(true);
}).get();
assertThat(isSuccess.get()).isTrue();
}

@Test
Expand Down Expand Up @@ -894,16 +967,24 @@ public void run() {

private final class FakeHelper extends Helper {

SubchannelPicker lastPicker;
Server server;
ManagedChannel oobChannel;

void createServerAndRegister(String target) throws IOException {
server = InProcessServerBuilder.forName(target)
.addService(rlsServerImpl)
.directExecutor()
.build()
.start();
grpcCleanupRule.register(server);
}

@Override
public ManagedChannelBuilder<?> createResolvingOobChannelBuilder(
String target, ChannelCredentials creds) {
try {
grpcCleanupRule.register(
InProcessServerBuilder.forName(target)
.addService(rlsServerImpl)
.directExecutor()
.build()
.start());
createServerAndRegister(target);
} catch (IOException e) {
throw new RuntimeException("cannot create server: " + target, e);
}
Expand All @@ -919,7 +1000,8 @@ protected ManagedChannelBuilder<?> delegate() {

@Override
public ManagedChannel build() {
return grpcCleanupRule.register(super.build());
oobChannel = super.build();
return grpcCleanupRule.register(oobChannel);
}

@Override
Expand Down Expand Up @@ -948,7 +1030,7 @@ public ManagedChannel createOobChannel(EquivalentAddressGroup eag, String author
@Override
public void updateBalancingState(
@Nonnull ConnectivityState newState, @Nonnull SubchannelPicker newPicker) {
// no-op
lastPicker = newPicker;
}

@Override
Expand Down
Loading
Loading