3232import com .google .cloud .GrpcTransportOptions .ExecutorFactory ;
3333import com .google .cloud .spanner .SessionPool .Clock ;
3434import com .google .cloud .spanner .SessionPool .PooledSession ;
35+ import com .google .common .util .concurrent .ListenableFuture ;
3536import com .google .common .util .concurrent .Uninterruptibles ;
3637import java .util .ArrayList ;
3738import java .util .Arrays ;
@@ -114,6 +115,123 @@ public void poolClosure() throws Exception {
114115 pool .closeAsync ().get ();
115116 }
116117
118+ @ Test
119+ public void poolClosureFailsPendingReadWaiters () throws Exception {
120+ final CountDownLatch insideCreation = new CountDownLatch (1 );
121+ final CountDownLatch releaseCreation = new CountDownLatch (1 );
122+ when (client .createSession (db ))
123+ .thenReturn (mock (Session .class ))
124+ .thenAnswer (
125+ new Answer <Session >() {
126+ @ Override
127+ public Session answer (InvocationOnMock invocation ) throws Throwable {
128+ insideCreation .countDown ();
129+ releaseCreation .await ();
130+ return mock (Session .class );
131+ }
132+ });
133+ pool = createPool ();
134+ pool .getReadSession ();
135+ AtomicBoolean failed = new AtomicBoolean (false );
136+ CountDownLatch latch = new CountDownLatch (1 );
137+ getSessionAsync (latch , failed );
138+ insideCreation .await ();
139+ pool .closeAsync ();
140+ releaseCreation .countDown ();
141+ latch .await ();
142+ assertThat (failed .get ()).isTrue ();
143+ }
144+
145+ @ Test
146+ public void poolClosureFailsPendingWriteWaiters () throws Exception {
147+ final CountDownLatch insideCreation = new CountDownLatch (1 );
148+ final CountDownLatch releaseCreation = new CountDownLatch (1 );
149+ when (client .createSession (db ))
150+ .thenReturn (mock (Session .class ))
151+ .thenAnswer (
152+ new Answer <Session >() {
153+ @ Override
154+ public Session answer (InvocationOnMock invocation ) throws Throwable {
155+ insideCreation .countDown ();
156+ releaseCreation .await ();
157+ return mock (Session .class );
158+ }
159+ });
160+ pool = createPool ();
161+ pool .getReadSession ();
162+ AtomicBoolean failed = new AtomicBoolean (false );
163+ CountDownLatch latch = new CountDownLatch (1 );
164+ getReadWriteSessionAsync (latch , failed );
165+ insideCreation .await ();
166+ pool .closeAsync ();
167+ releaseCreation .countDown ();
168+ latch .await ();
169+ assertThat (failed .get ()).isTrue ();
170+ }
171+
172+ @ Test
173+ public void poolClosesEvenIfCreationFails () throws Exception {
174+ final CountDownLatch insideCreation = new CountDownLatch (1 );
175+ final CountDownLatch releaseCreation = new CountDownLatch (1 );
176+ when (client .createSession (db ))
177+ .thenAnswer (
178+ new Answer <Session >() {
179+ @ Override
180+ public Session answer (InvocationOnMock invocation ) throws Throwable {
181+ insideCreation .countDown ();
182+ releaseCreation .await ();
183+ throw SpannerExceptionFactory .newSpannerException (new RuntimeException ());
184+ }
185+ });
186+ pool = createPool ();
187+ AtomicBoolean failed = new AtomicBoolean (false );
188+ CountDownLatch latch = new CountDownLatch (1 );
189+ getSessionAsync (latch , failed );
190+ insideCreation .await ();
191+ ListenableFuture <Void > f = pool .closeAsync ();
192+ releaseCreation .countDown ();
193+ f .get ();
194+ assertThat (f .isDone ()).isTrue ();
195+ }
196+
197+ @ Test
198+ public void poolClosesEvenIfPreparationFails () throws Exception {
199+ Session session = mock (Session .class );
200+ when (client .createSession (db )).thenReturn (session );
201+ final CountDownLatch insidePrepare = new CountDownLatch (1 );
202+ final CountDownLatch releasePrepare = new CountDownLatch (1 );
203+ doAnswer (
204+ new Answer <Session >() {
205+ @ Override
206+ public Session answer (InvocationOnMock invocation ) throws Throwable {
207+ insidePrepare .countDown ();
208+ releasePrepare .await ();
209+ throw SpannerExceptionFactory .newSpannerException (new RuntimeException ());
210+ }
211+ })
212+ .when (session )
213+ .prepareReadWriteTransaction ();
214+ pool = createPool ();
215+ AtomicBoolean failed = new AtomicBoolean (false );
216+ CountDownLatch latch = new CountDownLatch (1 );
217+ getReadWriteSessionAsync (latch , failed );
218+ insidePrepare .await ();
219+ ListenableFuture <Void > f = pool .closeAsync ();
220+ releasePrepare .countDown ();
221+ f .get ();
222+ assertThat (f .isDone ()).isTrue ();
223+ }
224+
225+ @ Test
226+ public void poolClosureFailsNewRequests () throws Exception {
227+ when (client .createSession (db )).thenReturn (mock (Session .class ));
228+ pool = createPool ();
229+ pool .getReadSession ();
230+ pool .closeAsync ();
231+ expectedException .expect (IllegalStateException .class );
232+ pool .getReadSession ();
233+ }
234+
117235 @ Test
118236 public void atMostMaxSessionsCreated () {
119237 AtomicBoolean failed = new AtomicBoolean (false );
@@ -243,6 +361,35 @@ public Void answer(InvocationOnMock arg0) throws Throwable {
243361 writeSession .close ();
244362 }
245363
364+ @ Test
365+ public void getReadSessionFallsBackToWritePreparedSession () throws Exception {
366+ Session mockSession1 = mock (Session .class );
367+ final CountDownLatch prepareLatch = new CountDownLatch (2 );
368+ doAnswer (
369+ new Answer <Void >() {
370+ @ Override
371+ public Void answer (InvocationOnMock arg0 ) throws Throwable {
372+ prepareLatch .countDown ();
373+ return null ;
374+ }
375+ })
376+ .when (mockSession1 )
377+ .prepareReadWriteTransaction ();
378+ when (client .createSession (db )).thenReturn (mockSession1 );
379+ options =
380+ SessionPoolOptions .newBuilder ()
381+ .setMinSessions (minSessions )
382+ .setMaxSessions (1 )
383+ .setWriteSessionsFraction (1.0f )
384+ .build ();
385+ pool = createPool ();
386+ pool .getReadWriteSession ().close ();
387+ prepareLatch .await ();
388+ // This session should also be write prepared.
389+ PooledSession readSession = (PooledSession ) pool .getReadSession ();
390+ verify (readSession .delegate , times (2 )).prepareReadWriteTransaction ();
391+ }
392+
246393 @ Test
247394 public void failOnPoolExhaustion () {
248395 options =
@@ -262,6 +409,18 @@ public void failOnPoolExhaustion() {
262409 session1 .close ();
263410 }
264411
412+ @ Test
413+ public void poolWorksWhenSessionNotFound () {
414+ Session mockSession1 = mock (Session .class );
415+ Session mockSession2 = mock (Session .class );
416+ doThrow (SpannerExceptionFactory .newSpannerException (ErrorCode .NOT_FOUND , "Session not found" ))
417+ .when (mockSession1 )
418+ .prepareReadWriteTransaction ();
419+ when (client .createSession (db )).thenReturn (mockSession1 ).thenReturn (mockSession2 );
420+ pool = createPool ();
421+ assertThat (((PooledSession ) pool .getReadWriteSession ()).delegate ).isEqualTo (mockSession2 );
422+ }
423+
265424 @ Test
266425 public void idleSessionCleanup () throws Exception {
267426 options =
@@ -366,6 +525,9 @@ public void run() {
366525 try (Session session = pool .getReadSession ()) {
367526 failed .compareAndSet (false , session == null );
368527 Uninterruptibles .sleepUninterruptibly (10 , TimeUnit .MILLISECONDS );
528+ } catch (SpannerException e ) {
529+ failed .compareAndSet (false , true );
530+ } finally {
369531 latch .countDown ();
370532 }
371533 }
@@ -381,6 +543,9 @@ public void run() {
381543 try (Session session = pool .getReadWriteSession ()) {
382544 failed .compareAndSet (false , session == null );
383545 Uninterruptibles .sleepUninterruptibly (2 , TimeUnit .MILLISECONDS );
546+ } catch (SpannerException e ) {
547+ failed .compareAndSet (false , true );
548+ } finally {
384549 latch .countDown ();
385550 }
386551 }
0 commit comments