Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.google.cloud.GrpcTransportOptions.ExecutorFactory;
import com.google.cloud.spanner.SessionPool.Clock;
import com.google.cloud.spanner.SessionPool.PooledSession;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -114,6 +115,123 @@ public void poolClosure() throws Exception {
pool.closeAsync().get();
}

@Test
public void poolClosureFailsPendingReadWaiters() throws Exception {
final CountDownLatch insideCreation = new CountDownLatch(1);
final CountDownLatch releaseCreation = new CountDownLatch(1);
when(client.createSession(db))
.thenReturn(mock(Session.class))
.thenAnswer(
new Answer<Session>() {
@Override
public Session answer(InvocationOnMock invocation) throws Throwable {
insideCreation.countDown();
releaseCreation.await();
return mock(Session.class);
}
});
pool = createPool();
pool.getReadSession();
AtomicBoolean failed = new AtomicBoolean(false);
CountDownLatch latch = new CountDownLatch(1);
getSessionAsync(latch, failed);
insideCreation.await();
pool.closeAsync();
releaseCreation.countDown();
latch.await();
assertThat(failed.get()).isTrue();
}

@Test
public void poolClosureFailsPendingWriteWaiters() throws Exception {
final CountDownLatch insideCreation = new CountDownLatch(1);
final CountDownLatch releaseCreation = new CountDownLatch(1);
when(client.createSession(db))
.thenReturn(mock(Session.class))
.thenAnswer(
new Answer<Session>() {
@Override
public Session answer(InvocationOnMock invocation) throws Throwable {
insideCreation.countDown();
releaseCreation.await();
return mock(Session.class);
}
});
pool = createPool();
pool.getReadSession();
AtomicBoolean failed = new AtomicBoolean(false);
CountDownLatch latch = new CountDownLatch(1);
getReadWriteSessionAsync(latch, failed);
insideCreation.await();
pool.closeAsync();
releaseCreation.countDown();
latch.await();
assertThat(failed.get()).isTrue();
}

@Test
public void poolClosesEvenIfCreationFails() throws Exception {
final CountDownLatch insideCreation = new CountDownLatch(1);
final CountDownLatch releaseCreation = new CountDownLatch(1);
when(client.createSession(db))
.thenAnswer(
new Answer<Session>() {
@Override
public Session answer(InvocationOnMock invocation) throws Throwable {
insideCreation.countDown();
releaseCreation.await();
throw SpannerExceptionFactory.newSpannerException(new RuntimeException());
}
});
pool = createPool();
AtomicBoolean failed = new AtomicBoolean(false);
CountDownLatch latch = new CountDownLatch(1);
getSessionAsync(latch, failed);
insideCreation.await();
ListenableFuture<Void> f = pool.closeAsync();
releaseCreation.countDown();
f.get();
assertThat(f.isDone()).isTrue();
}

@Test
public void poolClosesEvenIfPreparationFails() throws Exception {
Session session = mock(Session.class);
when(client.createSession(db)).thenReturn(session);
final CountDownLatch insidePrepare = new CountDownLatch(1);
final CountDownLatch releasePrepare = new CountDownLatch(1);
doAnswer(
new Answer<Session>() {
@Override
public Session answer(InvocationOnMock invocation) throws Throwable {
insidePrepare.countDown();
releasePrepare.await();
throw SpannerExceptionFactory.newSpannerException(new RuntimeException());
}
})
.when(session)
.prepareReadWriteTransaction();
pool = createPool();
AtomicBoolean failed = new AtomicBoolean(false);
CountDownLatch latch = new CountDownLatch(1);
getReadWriteSessionAsync(latch, failed);
insidePrepare.await();
ListenableFuture<Void> f = pool.closeAsync();
releasePrepare.countDown();
f.get();
assertThat(f.isDone()).isTrue();
}

@Test
public void poolClosureFailsNewRequests() throws Exception {
when(client.createSession(db)).thenReturn(mock(Session.class));
pool = createPool();
pool.getReadSession();
pool.closeAsync();
expectedException.expect(IllegalStateException.class);
pool.getReadSession();
}

@Test
public void atMostMaxSessionsCreated() {
AtomicBoolean failed = new AtomicBoolean(false);
Expand Down Expand Up @@ -243,6 +361,35 @@ public Void answer(InvocationOnMock arg0) throws Throwable {
writeSession.close();
}

@Test
public void getReadSessionFallsBackToWritePreparedSession() throws Exception {
Session mockSession1 = mock(Session.class);
final CountDownLatch prepareLatch = new CountDownLatch(2);
doAnswer(
new Answer<Void>() {
@Override
public Void answer(InvocationOnMock arg0) throws Throwable {
prepareLatch.countDown();
return null;
}
})
.when(mockSession1)
.prepareReadWriteTransaction();
when(client.createSession(db)).thenReturn(mockSession1);
options =
SessionPoolOptions.newBuilder()
.setMinSessions(minSessions)
.setMaxSessions(1)
.setWriteSessionsFraction(1.0f)
.build();
pool = createPool();
pool.getReadWriteSession().close();
prepareLatch.await();
// This session should also be write prepared.
PooledSession readSession = (PooledSession) pool.getReadSession();
verify(readSession.delegate, times(2)).prepareReadWriteTransaction();
}

@Test
public void failOnPoolExhaustion() {
options =
Expand All @@ -262,6 +409,18 @@ public void failOnPoolExhaustion() {
session1.close();
}

@Test
public void poolWorksWhenSessionNotFound() {
Session mockSession1 = mock(Session.class);
Session mockSession2 = mock(Session.class);
doThrow(SpannerExceptionFactory.newSpannerException(ErrorCode.NOT_FOUND, "Session not found"))
.when(mockSession1)
.prepareReadWriteTransaction();
when(client.createSession(db)).thenReturn(mockSession1).thenReturn(mockSession2);
pool = createPool();
assertThat(((PooledSession) pool.getReadWriteSession()).delegate).isEqualTo(mockSession2);
}

@Test
public void idleSessionCleanup() throws Exception {
options =
Expand Down Expand Up @@ -366,6 +525,9 @@ public void run() {
try (Session session = pool.getReadSession()) {
failed.compareAndSet(false, session == null);
Uninterruptibles.sleepUninterruptibly(10, TimeUnit.MILLISECONDS);
} catch (SpannerException e) {
failed.compareAndSet(false, true);
} finally {
latch.countDown();
}
}
Expand All @@ -381,6 +543,9 @@ public void run() {
try (Session session = pool.getReadWriteSession()) {
failed.compareAndSet(false, session == null);
Uninterruptibles.sleepUninterruptibly(2, TimeUnit.MILLISECONDS);
} catch (SpannerException e) {
failed.compareAndSet(false, true);
} finally {
latch.countDown();
}
}
Expand Down