|
81 | 81 | import io.grpc.internal.JsonParser; |
82 | 82 | import io.grpc.lb.v1.ClientStats; |
83 | 83 | import io.grpc.lb.v1.ClientStatsPerToken; |
| 84 | +import io.grpc.lb.v1.FallbackResponse; |
84 | 85 | import io.grpc.lb.v1.InitialLoadBalanceRequest; |
85 | 86 | import io.grpc.lb.v1.InitialLoadBalanceResponse; |
86 | 87 | import io.grpc.lb.v1.LoadBalanceRequest; |
@@ -288,7 +289,6 @@ public Void answer(InvocationOnMock invocation) throws Throwable { |
288 | 289 | } |
289 | 290 |
|
290 | 291 | @After |
291 | | - @SuppressWarnings("unchecked") |
292 | 292 | public void tearDown() { |
293 | 293 | try { |
294 | 294 | if (balancer != null) { |
@@ -2085,6 +2085,210 @@ public void retrieveModeFromLbConfig_badConfigDefaultToRoundRobin() throws Excep |
2085 | 2085 | assertThat(mode).isEqualTo(Mode.ROUND_ROBIN); |
2086 | 2086 | } |
2087 | 2087 |
|
| 2088 | + @Test |
| 2089 | + public void grpclbWorking_lbSendsFallbackMessage() { |
| 2090 | + InOrder inOrder = inOrder(helper, subchannelPool); |
| 2091 | + List<EquivalentAddressGroup> grpclbResolutionList = |
| 2092 | + createResolvedServerAddresses(true, true, false, false); |
| 2093 | + List<EquivalentAddressGroup> fallbackEags = grpclbResolutionList.subList(2, 4); |
| 2094 | + Attributes grpclbResolutionAttrs = Attributes.EMPTY; |
| 2095 | + deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); |
| 2096 | + |
| 2097 | + // Fallback timer is started as soon as the addresses are resolved. |
| 2098 | + assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); |
| 2099 | + |
| 2100 | + List<SocketAddress> addrs = new ArrayList<>(); |
| 2101 | + addrs.addAll(grpclbResolutionList.get(0).getAddresses()); |
| 2102 | + addrs.addAll(grpclbResolutionList.get(1).getAddresses()); |
| 2103 | + Attributes attr = grpclbResolutionList.get(0).getAttributes(); |
| 2104 | + EquivalentAddressGroup oobChannelEag = new EquivalentAddressGroup(addrs, attr); |
| 2105 | + verify(helper).createOobChannel(eq(oobChannelEag), eq(lbAuthority(0))); |
| 2106 | + assertEquals(1, fakeOobChannels.size()); |
| 2107 | + ManagedChannel oobChannel = fakeOobChannels.poll(); |
| 2108 | + verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); |
| 2109 | + StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue(); |
| 2110 | + assertEquals(1, lbRequestObservers.size()); |
| 2111 | + StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll(); |
| 2112 | + verify(lbRequestObserver).onNext( |
| 2113 | + eq(LoadBalanceRequest.newBuilder() |
| 2114 | + .setInitialRequest( |
| 2115 | + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) |
| 2116 | + .build())); |
| 2117 | + |
| 2118 | + // Simulate receiving LB response |
| 2119 | + ServerEntry backend1a = new ServerEntry("127.0.0.1", 2000, "token0001"); |
| 2120 | + ServerEntry backend1b = new ServerEntry("127.0.0.1", 2010, "token0002"); |
| 2121 | + List<ServerEntry> backends1 = Arrays.asList(backend1a, backend1b); |
| 2122 | + inOrder.verify(helper, never()) |
| 2123 | + .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); |
| 2124 | + logs.clear(); |
| 2125 | + lbResponseObserver.onNext(buildInitialResponse()); |
| 2126 | + assertThat(logs).containsExactly("DEBUG: Got an LB response: " + buildInitialResponse()); |
| 2127 | + logs.clear(); |
| 2128 | + lbResponseObserver.onNext(buildLbResponse(backends1)); |
| 2129 | + |
| 2130 | + inOrder.verify(subchannelPool).takeOrCreateSubchannel( |
| 2131 | + eq(new EquivalentAddressGroup(backend1a.addr, LB_BACKEND_ATTRS)), |
| 2132 | + any(Attributes.class)); |
| 2133 | + inOrder.verify(subchannelPool).takeOrCreateSubchannel( |
| 2134 | + eq(new EquivalentAddressGroup(backend1b.addr, LB_BACKEND_ATTRS)), |
| 2135 | + any(Attributes.class)); |
| 2136 | + |
| 2137 | + assertEquals(2, mockSubchannels.size()); |
| 2138 | + Subchannel subchannel1 = mockSubchannels.poll(); |
| 2139 | + Subchannel subchannel2 = mockSubchannels.poll(); |
| 2140 | + |
| 2141 | + verify(subchannel1).requestConnection(); |
| 2142 | + verify(subchannel2).requestConnection(); |
| 2143 | + assertEquals( |
| 2144 | + new EquivalentAddressGroup(backend1a.addr, LB_BACKEND_ATTRS), |
| 2145 | + subchannel1.getAddresses()); |
| 2146 | + assertEquals( |
| 2147 | + new EquivalentAddressGroup(backend1b.addr, LB_BACKEND_ATTRS), |
| 2148 | + subchannel2.getAddresses()); |
| 2149 | + |
| 2150 | + deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(CONNECTING)); |
| 2151 | + deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(CONNECTING)); |
| 2152 | + |
| 2153 | + inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); |
| 2154 | + RoundRobinPicker picker0 = (RoundRobinPicker) pickerCaptor.getValue(); |
| 2155 | + assertThat(picker0.dropList).containsExactly(null, null); |
| 2156 | + assertThat(picker0.pickList).containsExactly(BUFFER_ENTRY); |
| 2157 | + inOrder.verifyNoMoreInteractions(); |
| 2158 | + |
| 2159 | + assertThat(logs).containsExactly( |
| 2160 | + "DEBUG: Got an LB response: " + buildLbResponse(backends1), |
| 2161 | + "INFO: Using RR list=" |
| 2162 | + + "[[[/127.0.0.1:2000]/{io.grpc.grpclb.lbProvidedBackend=true}](token0001)," |
| 2163 | + + " [[/127.0.0.1:2010]/{io.grpc.grpclb.lbProvidedBackend=true}](token0002)]," |
| 2164 | + + " drop=[null, null]", |
| 2165 | + "INFO: CONNECTING: picks=[BUFFER_ENTRY], drops=[null, null]").inOrder(); |
| 2166 | + logs.clear(); |
| 2167 | + |
| 2168 | + // Let subchannels be connected |
| 2169 | + deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(READY)); |
| 2170 | + inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); |
| 2171 | + assertThat(logs).containsExactly( |
| 2172 | + "INFO: READY: picks=" |
| 2173 | + + "[[[[[/127.0.0.1:2010]/{io.grpc.grpclb.lbProvidedBackend=true}]](token0002)]]," |
| 2174 | + + " drops=[null, null]"); |
| 2175 | + logs.clear(); |
| 2176 | + |
| 2177 | + RoundRobinPicker picker1 = (RoundRobinPicker) pickerCaptor.getValue(); |
| 2178 | + |
| 2179 | + assertThat(picker1.dropList).containsExactly(null, null); |
| 2180 | + assertThat(picker1.pickList).containsExactly( |
| 2181 | + new BackendEntry(subchannel2, getLoadRecorder(), "token0002")); |
| 2182 | + |
| 2183 | + deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(READY)); |
| 2184 | + inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); |
| 2185 | + assertThat(logs).containsExactly( |
| 2186 | + "INFO: READY: picks=" |
| 2187 | + + "[[[[[/127.0.0.1:2000]/{io.grpc.grpclb.lbProvidedBackend=true}]](token0001)]," |
| 2188 | + + " [[[[/127.0.0.1:2010]/{io.grpc.grpclb.lbProvidedBackend=true}]](token0002)]]," |
| 2189 | + + " drops=[null, null]"); |
| 2190 | + logs.clear(); |
| 2191 | + |
| 2192 | + RoundRobinPicker picker2 = (RoundRobinPicker) pickerCaptor.getValue(); |
| 2193 | + assertThat(picker2.dropList).containsExactly(null, null); |
| 2194 | + assertThat(picker2.pickList).containsExactly( |
| 2195 | + new BackendEntry(subchannel1, getLoadRecorder(), "token0001"), |
| 2196 | + new BackendEntry(subchannel2, getLoadRecorder(), "token0002")) |
| 2197 | + .inOrder(); |
| 2198 | + |
| 2199 | + // enter fallback mode |
| 2200 | + lbResponseObserver.onNext(buildLbFallbackResponse()); |
| 2201 | + |
| 2202 | + // existing subchannels must be returned immediately to gracefully shutdown. |
| 2203 | + verify(subchannelPool) |
| 2204 | + .returnSubchannel(eq(subchannel1), eq(ConnectivityStateInfo.forNonError(READY))); |
| 2205 | + verify(subchannelPool) |
| 2206 | + .returnSubchannel(eq(subchannel2), eq(ConnectivityStateInfo.forNonError(READY))); |
| 2207 | + |
| 2208 | + // verify fallback |
| 2209 | + fallbackTestVerifyUseOfFallbackBackendLists(inOrder, fallbackEags); |
| 2210 | + |
| 2211 | + assertFalse(oobChannel.isShutdown()); |
| 2212 | + verify(lbRequestObserver, never()).onCompleted(); |
| 2213 | + |
| 2214 | + // exit fall back by providing two new backends |
| 2215 | + ServerEntry backend2a = new ServerEntry("127.0.0.1", 8000, "token1001"); |
| 2216 | + ServerEntry backend2b = new ServerEntry("127.0.0.1", 8010, "token1002"); |
| 2217 | + List<ServerEntry> backends2 = Arrays.asList(backend2a, backend2b); |
| 2218 | + |
| 2219 | + inOrder.verify(helper, never()) |
| 2220 | + .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); |
| 2221 | + logs.clear(); |
| 2222 | + lbResponseObserver.onNext(buildLbResponse(backends2)); |
| 2223 | + |
| 2224 | + inOrder.verify(subchannelPool).takeOrCreateSubchannel( |
| 2225 | + eq(new EquivalentAddressGroup(backend2a.addr, LB_BACKEND_ATTRS)), |
| 2226 | + any(Attributes.class)); |
| 2227 | + inOrder.verify(subchannelPool).takeOrCreateSubchannel( |
| 2228 | + eq(new EquivalentAddressGroup(backend2b.addr, LB_BACKEND_ATTRS)), |
| 2229 | + any(Attributes.class)); |
| 2230 | + |
| 2231 | + assertEquals(2, mockSubchannels.size()); |
| 2232 | + Subchannel subchannel3 = mockSubchannels.poll(); |
| 2233 | + Subchannel subchannel4 = mockSubchannels.poll(); |
| 2234 | + verify(subchannel3).requestConnection(); |
| 2235 | + verify(subchannel4).requestConnection(); |
| 2236 | + assertEquals( |
| 2237 | + new EquivalentAddressGroup(backend2a.addr, LB_BACKEND_ATTRS), |
| 2238 | + subchannel3.getAddresses()); |
| 2239 | + assertEquals( |
| 2240 | + new EquivalentAddressGroup(backend2b.addr, LB_BACKEND_ATTRS), |
| 2241 | + subchannel4.getAddresses()); |
| 2242 | + |
| 2243 | + deliverSubchannelState(subchannel3, ConnectivityStateInfo.forNonError(CONNECTING)); |
| 2244 | + deliverSubchannelState(subchannel4, ConnectivityStateInfo.forNonError(CONNECTING)); |
| 2245 | + |
| 2246 | + inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); |
| 2247 | + RoundRobinPicker picker6 = (RoundRobinPicker) pickerCaptor.getValue(); |
| 2248 | + assertThat(picker6.dropList).containsExactly(null, null); |
| 2249 | + assertThat(picker6.pickList).containsExactly(BUFFER_ENTRY); |
| 2250 | + inOrder.verifyNoMoreInteractions(); |
| 2251 | + |
| 2252 | + assertThat(logs).containsExactly( |
| 2253 | + "DEBUG: Got an LB response: " + buildLbResponse(backends2), |
| 2254 | + "INFO: Using RR list=" |
| 2255 | + + "[[[/127.0.0.1:8000]/{io.grpc.grpclb.lbProvidedBackend=true}](token1001)," |
| 2256 | + + " [[/127.0.0.1:8010]/{io.grpc.grpclb.lbProvidedBackend=true}](token1002)]," |
| 2257 | + + " drop=[null, null]", |
| 2258 | + "INFO: CONNECTING: picks=[BUFFER_ENTRY], drops=[null, null]").inOrder(); |
| 2259 | + logs.clear(); |
| 2260 | + |
| 2261 | + // Let new subchannels be connected |
| 2262 | + deliverSubchannelState(subchannel3, ConnectivityStateInfo.forNonError(READY)); |
| 2263 | + inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); |
| 2264 | + assertThat(logs).containsExactly( |
| 2265 | + "INFO: READY: picks=" |
| 2266 | + + "[[[[[/127.0.0.1:8000]/{io.grpc.grpclb.lbProvidedBackend=true}]](token1001)]]," |
| 2267 | + + " drops=[null, null]"); |
| 2268 | + logs.clear(); |
| 2269 | + |
| 2270 | + RoundRobinPicker picker3 = (RoundRobinPicker) pickerCaptor.getValue(); |
| 2271 | + assertThat(picker3.dropList).containsExactly(null, null); |
| 2272 | + assertThat(picker3.pickList).containsExactly( |
| 2273 | + new BackendEntry(subchannel3, getLoadRecorder(), "token1001")); |
| 2274 | + |
| 2275 | + deliverSubchannelState(subchannel4, ConnectivityStateInfo.forNonError(READY)); |
| 2276 | + inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); |
| 2277 | + assertThat(logs).containsExactly( |
| 2278 | + "INFO: READY: picks=" |
| 2279 | + + "[[[[[/127.0.0.1:8000]/{io.grpc.grpclb.lbProvidedBackend=true}]](token1001)]," |
| 2280 | + + " [[[[/127.0.0.1:8010]/{io.grpc.grpclb.lbProvidedBackend=true}]](token1002)]]," |
| 2281 | + + " drops=[null, null]"); |
| 2282 | + logs.clear(); |
| 2283 | + |
| 2284 | + RoundRobinPicker picker4 = (RoundRobinPicker) pickerCaptor.getValue(); |
| 2285 | + assertThat(picker4.dropList).containsExactly(null, null); |
| 2286 | + assertThat(picker4.pickList).containsExactly( |
| 2287 | + new BackendEntry(subchannel3, getLoadRecorder(), "token1001"), |
| 2288 | + new BackendEntry(subchannel4, getLoadRecorder(), "token1002")) |
| 2289 | + .inOrder(); |
| 2290 | + } |
| 2291 | + |
2088 | 2292 | @SuppressWarnings("deprecation") |
2089 | 2293 | private void deliverSubchannelState( |
2090 | 2294 | final Subchannel subchannel, final ConnectivityStateInfo newState) { |
@@ -2154,7 +2358,13 @@ private static LoadBalanceResponse buildInitialResponse(long loadReportIntervalM |
2154 | 2358 | return LoadBalanceResponse.newBuilder() |
2155 | 2359 | .setInitialResponse( |
2156 | 2360 | InitialLoadBalanceResponse.newBuilder() |
2157 | | - .setClientStatsReportInterval(Durations.fromMillis(loadReportIntervalMillis))) |
| 2361 | + .setClientStatsReportInterval(Durations.fromMillis(loadReportIntervalMillis))) |
| 2362 | + .build(); |
| 2363 | + } |
| 2364 | + |
| 2365 | + private static LoadBalanceResponse buildLbFallbackResponse() { |
| 2366 | + return LoadBalanceResponse.newBuilder() |
| 2367 | + .setFallbackResponse(FallbackResponse.newBuilder().build()) |
2158 | 2368 | .build(); |
2159 | 2369 | } |
2160 | 2370 |
|
|
0 commit comments