diff --git a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java index 9198dffb1e5e..ef37267d724e 100644 --- a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java +++ b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java @@ -28,8 +28,7 @@ import com.google.cloud.spanner.AbstractResultSet.ResumableStreamIterator; import com.google.cloud.spanner.Options.QueryOption; import com.google.cloud.spanner.Options.ReadOption; -import com.google.cloud.spanner.SpannerImpl.SessionImpl; -import com.google.cloud.spanner.SpannerImpl.SessionTransaction; +import com.google.cloud.spanner.SessionImpl.SessionTransaction; import com.google.cloud.spanner.spi.v1.SpannerRpc; import com.google.protobuf.ByteString; import com.google.spanner.v1.BeginTransactionRequest; @@ -205,7 +204,7 @@ ByteString getTransactionId() { } void initTransaction() { - SpannerImpl.throwIfTransactionsPending(); + SessionImpl.throwIfTransactionsPending(); // Since we only support synchronous calls, just block on "txnLock" while the RPC is in // flight. Note that we use the strategy of sending an explicit BeginTransaction() RPC, diff --git a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchClientImpl.java b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchClientImpl.java index eb52cb035381..8840abc7415a 100644 --- a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchClientImpl.java +++ b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchClientImpl.java @@ -21,7 +21,6 @@ import com.google.cloud.spanner.AbstractReadContext.MultiUseReadOnlyTransaction; import com.google.cloud.spanner.Options.QueryOption; import com.google.cloud.spanner.Options.ReadOption; -import com.google.cloud.spanner.SpannerImpl.SessionImpl; import com.google.cloud.spanner.spi.v1.SpannerRpc; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; diff --git a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDMLTransaction.java b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDMLTransaction.java index 7ca7af4c6293..cefd128c499c 100644 --- a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDMLTransaction.java +++ b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDMLTransaction.java @@ -18,8 +18,7 @@ import static com.google.common.base.Preconditions.checkState; -import com.google.cloud.spanner.SpannerImpl.SessionImpl; -import com.google.cloud.spanner.SpannerImpl.SessionTransaction; +import com.google.cloud.spanner.SessionImpl.SessionTransaction; import com.google.cloud.spanner.spi.v1.SpannerRpc; import com.google.protobuf.ByteString; import com.google.spanner.v1.BeginTransactionRequest; diff --git a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java new file mode 100644 index 000000000000..6b1314f49836 --- /dev/null +++ b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java @@ -0,0 +1,277 @@ +/* + * Copyright 2019 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner; + +import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException; +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.cloud.Timestamp; +import com.google.cloud.spanner.AbstractReadContext.MultiUseReadOnlyTransaction; +import com.google.cloud.spanner.AbstractReadContext.SingleReadContext; +import com.google.cloud.spanner.AbstractReadContext.SingleUseReadOnlyTransaction; +import com.google.cloud.spanner.TransactionRunnerImpl.TransactionContextImpl; +import com.google.cloud.spanner.spi.v1.SpannerRpc; +import com.google.common.collect.Lists; +import com.google.protobuf.ByteString; +import com.google.spanner.v1.BeginTransactionRequest; +import com.google.spanner.v1.CommitRequest; +import com.google.spanner.v1.CommitResponse; +import com.google.spanner.v1.Transaction; +import com.google.spanner.v1.TransactionOptions; +import io.opencensus.common.Scope; +import io.opencensus.trace.Span; +import io.opencensus.trace.Tracer; +import io.opencensus.trace.Tracing; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import javax.annotation.Nullable; + +/** + * Implementation of {@link Session}. Sessions are managed internally by the client library, and + * users need not be aware of the actual session management, pooling and handling. + */ +class SessionImpl implements Session { + private static final Tracer tracer = Tracing.getTracer(); + + /** Keep track of running transactions on this session per thread. */ + static final ThreadLocal hasPendingTransaction = + new ThreadLocal() { + @Override + protected Boolean initialValue() { + return false; + } + }; + + static void throwIfTransactionsPending() { + if (hasPendingTransaction.get() == Boolean.TRUE) { + throw newSpannerException(ErrorCode.INTERNAL, "Nested transactions are not supported"); + } + } + + /** + * Represents a transaction within a session. "Transaction" here is used in the general sense, + * which covers standalone reads, standalone writes, single-use and multi-use read-only + * transactions, and read-write transactions. The defining characteristic is that a session may + * only have one such transaction active at a time. + */ + static interface SessionTransaction { + /** Invalidates the transaction, generally because a new one has been started on the session. */ + void invalidate(); + } + + private final SpannerImpl spanner; + private final String name; + private SessionTransaction activeTransaction; + private ByteString readyTransactionId; + private final Map options; + + SessionImpl(SpannerImpl spanner, String name, Map options) { + this.spanner = spanner; + this.options = options; + this.name = checkNotNull(name); + } + + @Override + public String getName() { + return name; + } + + Map getOptions() { + return options; + } + + @Override + public long executePartitionedUpdate(Statement stmt) { + setActive(null); + PartitionedDMLTransaction txn = new PartitionedDMLTransaction(this, spanner.getRpc()); + return txn.executePartitionedUpdate(stmt); + } + + @Override + public Timestamp write(Iterable mutations) throws SpannerException { + TransactionRunner runner = readWriteTransaction(); + final Collection finalMutations = + mutations instanceof java.util.Collection + ? (Collection) mutations + : Lists.newArrayList(mutations); + runner.run( + new TransactionRunner.TransactionCallable() { + @Override + public Void run(TransactionContext ctx) { + ctx.buffer(finalMutations); + return null; + } + }); + return runner.getCommitTimestamp(); + } + + @Override + public Timestamp writeAtLeastOnce(Iterable mutations) throws SpannerException { + setActive(null); + List mutationsProto = new ArrayList<>(); + Mutation.toProto(mutations, mutationsProto); + final CommitRequest request = + CommitRequest.newBuilder() + .setSession(name) + .addAllMutations(mutationsProto) + .setSingleUseTransaction( + TransactionOptions.newBuilder() + .setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance())) + .build(); + Span span = tracer.spanBuilder(SpannerImpl.COMMIT).startSpan(); + try (Scope s = tracer.withSpan(span)) { + CommitResponse response = + SpannerImpl.runWithRetries( + new Callable() { + @Override + public CommitResponse call() throws Exception { + return spanner.getRpc().commit(request, options); + } + }); + Timestamp t = Timestamp.fromProto(response.getCommitTimestamp()); + span.end(); + return t; + } catch (IllegalArgumentException e) { + TraceUtil.endSpanWithFailure(span, e); + throw newSpannerException(ErrorCode.INTERNAL, "Could not parse commit timestamp", e); + } catch (RuntimeException e) { + TraceUtil.endSpanWithFailure(span, e); + throw e; + } + } + + @Override + public ReadContext singleUse() { + return singleUse(TimestampBound.strong()); + } + + @Override + public ReadContext singleUse(TimestampBound bound) { + return setActive( + new SingleReadContext(this, bound, spanner.getRpc(), spanner.getDefaultPrefetchChunks())); + } + + @Override + public ReadOnlyTransaction singleUseReadOnlyTransaction() { + return singleUseReadOnlyTransaction(TimestampBound.strong()); + } + + @Override + public ReadOnlyTransaction singleUseReadOnlyTransaction(TimestampBound bound) { + return setActive( + new SingleUseReadOnlyTransaction( + this, bound, spanner.getRpc(), spanner.getDefaultPrefetchChunks())); + } + + @Override + public ReadOnlyTransaction readOnlyTransaction() { + return readOnlyTransaction(TimestampBound.strong()); + } + + @Override + public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) { + return setActive( + new MultiUseReadOnlyTransaction( + this, bound, spanner.getRpc(), spanner.getDefaultPrefetchChunks())); + } + + @Override + public TransactionRunner readWriteTransaction() { + return setActive( + new TransactionRunnerImpl(this, spanner.getRpc(), spanner.getDefaultPrefetchChunks())); + } + + @Override + public void prepareReadWriteTransaction() { + setActive(null); + readyTransactionId = beginTransaction(); + } + + @Override + public void close() { + Span span = tracer.spanBuilder(SpannerImpl.DELETE_SESSION).startSpan(); + try (Scope s = tracer.withSpan(span)) { + SpannerImpl.runWithRetries( + new Callable() { + @Override + public Void call() throws Exception { + spanner.getRpc().deleteSession(name, options); + return null; + } + }); + span.end(); + } catch (RuntimeException e) { + TraceUtil.endSpanWithFailure(span, e); + throw e; + } + } + + ByteString beginTransaction() { + Span span = tracer.spanBuilder(SpannerImpl.BEGIN_TRANSACTION).startSpan(); + try (Scope s = tracer.withSpan(span)) { + final BeginTransactionRequest request = + BeginTransactionRequest.newBuilder() + .setSession(name) + .setOptions( + TransactionOptions.newBuilder() + .setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance())) + .build(); + Transaction txn = + SpannerImpl.runWithRetries( + new Callable() { + @Override + public Transaction call() throws Exception { + return spanner.getRpc().beginTransaction(request, options); + } + }); + if (txn.getId().isEmpty()) { + throw newSpannerException(ErrorCode.INTERNAL, "Missing id in transaction\n" + getName()); + } + span.end(); + return txn.getId(); + } catch (RuntimeException e) { + TraceUtil.endSpanWithFailure(span, e); + throw e; + } + } + + TransactionContextImpl newTransaction() { + TransactionContextImpl txn = + new TransactionContextImpl( + this, readyTransactionId, spanner.getRpc(), spanner.getDefaultPrefetchChunks()); + return txn; + } + + T setActive(@Nullable T ctx) { + throwIfTransactionsPending(); + + if (activeTransaction != null) { + activeTransaction.invalidate(); + } + activeTransaction = ctx; + readyTransactionId = null; + return ctx; + } + + @Override + public TransactionManager transactionManager() { + return new TransactionManagerImpl(this); + } +} diff --git a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java index 909df2b00d1d..458659a4f9ae 100644 --- a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java +++ b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java @@ -28,29 +28,17 @@ import com.google.cloud.BaseService; import com.google.cloud.PageImpl; import com.google.cloud.PageImpl.NextPageFetcher; -import com.google.cloud.Timestamp; -import com.google.cloud.spanner.AbstractReadContext.MultiUseReadOnlyTransaction; -import com.google.cloud.spanner.AbstractReadContext.SingleReadContext; -import com.google.cloud.spanner.AbstractReadContext.SingleUseReadOnlyTransaction; -import com.google.cloud.spanner.TransactionRunnerImpl.TransactionContextImpl; import com.google.cloud.spanner.spi.v1.SpannerRpc; import com.google.cloud.spanner.spi.v1.SpannerRpc.Paginated; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.protobuf.Any; -import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.Message; -import com.google.spanner.v1.BeginTransactionRequest; -import com.google.spanner.v1.CommitRequest; -import com.google.spanner.v1.CommitResponse; -import com.google.spanner.v1.Transaction; -import com.google.spanner.v1.TransactionOptions; import io.grpc.Context; import io.opencensus.common.Scope; import io.opencensus.trace.AttributeValue; @@ -59,7 +47,6 @@ import io.opencensus.trace.Tracing; import java.io.IOException; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -89,26 +76,12 @@ class SpannerImpl extends BaseService implements Spanner { private static final Tracer tracer = Tracing.getTracer(); private static final String CREATE_SESSION = "CloudSpannerOperation.CreateSession"; - private static final String DELETE_SESSION = "CloudSpannerOperation.DeleteSession"; - private static final String BEGIN_TRANSACTION = "CloudSpannerOperation.BeginTransaction"; + static final String DELETE_SESSION = "CloudSpannerOperation.DeleteSession"; + static final String BEGIN_TRANSACTION = "CloudSpannerOperation.BeginTransaction"; static final String COMMIT = "CloudSpannerOperation.Commit"; static final String QUERY = "CloudSpannerOperation.ExecuteStreamingQuery"; static final String READ = "CloudSpannerOperation.ExecuteStreamingRead"; - static final ThreadLocal hasPendingTransaction = - new ThreadLocal() { - @Override - protected Boolean initialValue() { - return false; - } - }; - - static void throwIfTransactionsPending() { - if (hasPendingTransaction.get() == Boolean.TRUE) { - throw newSpannerException(ErrorCode.INTERNAL, "Nested transactions are not supported"); - } - } - static { TraceUtil.exportSpans(CREATE_SESSION, DELETE_SESSION, BEGIN_TRANSACTION, COMMIT, QUERY, READ); } @@ -229,6 +202,16 @@ static T runWithRetries(Callable callable) { } } + /** Returns the {@link SpannerRpc} of this {@link SpannerImpl} instance. */ + SpannerRpc getRpc() { + return gapicRpc; + } + + /** Returns the default setting for prefetchChunks of this {@link SpannerImpl} instance. */ + int getDefaultPrefetchChunks() { + return defaultPrefetchChunks; + } + // TODO(user): change this to return SessionImpl and modify all corresponding references. Session createSession(final DatabaseId db) throws SpannerException { final Map options = @@ -245,7 +228,7 @@ public com.google.spanner.v1.Session call() throws Exception { } }); span.end(); - return new SessionImpl(session.getName(), options); + return new SessionImpl(this, session.getName(), options); } catch (RuntimeException e) { TraceUtil.endSpanWithFailure(span, e); throw e; @@ -255,7 +238,7 @@ public com.google.spanner.v1.Session call() throws Exception { SessionImpl sessionWithId(String name) { final Map options = SpannerImpl.optionMap(SessionOption.channelHint(random.nextLong())); - return new SessionImpl(name, options); + return new SessionImpl(this, name, options); } @Override @@ -400,211 +383,6 @@ void setNextPageToken(String nextPageToken) { abstract S fromProto(T proto); } - class SessionImpl implements Session { - private final String name; - private SessionTransaction activeTransaction; - private ByteString readyTransactionId; - private final Map options; - - SessionImpl(String name, Map options) { - this.options = options; - this.name = checkNotNull(name); - } - - @Override - public String getName() { - return name; - } - - Map getOptions() { - return options; - } - - @Override - public long executePartitionedUpdate(Statement stmt) { - setActive(null); - PartitionedDMLTransaction txn = new PartitionedDMLTransaction(this, gapicRpc); - return txn.executePartitionedUpdate(stmt); - } - - @Override - public Timestamp write(Iterable mutations) throws SpannerException { - TransactionRunner runner = readWriteTransaction(); - final Collection finalMutations = - mutations instanceof java.util.Collection - ? (Collection) mutations - : Lists.newArrayList(mutations); - runner.run( - new TransactionRunner.TransactionCallable() { - @Override - public Void run(TransactionContext ctx) { - ctx.buffer(finalMutations); - return null; - } - }); - return runner.getCommitTimestamp(); - } - - @Override - public Timestamp writeAtLeastOnce(Iterable mutations) throws SpannerException { - setActive(null); - List mutationsProto = new ArrayList<>(); - Mutation.toProto(mutations, mutationsProto); - final CommitRequest request = - CommitRequest.newBuilder() - .setSession(name) - .addAllMutations(mutationsProto) - .setSingleUseTransaction( - TransactionOptions.newBuilder() - .setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance())) - .build(); - Span span = tracer.spanBuilder(COMMIT).startSpan(); - try (Scope s = tracer.withSpan(span)) { - CommitResponse response = - runWithRetries( - new Callable() { - @Override - public CommitResponse call() throws Exception { - return gapicRpc.commit(request, options); - } - }); - Timestamp t = Timestamp.fromProto(response.getCommitTimestamp()); - span.end(); - return t; - } catch (IllegalArgumentException e) { - TraceUtil.endSpanWithFailure(span, e); - throw newSpannerException(ErrorCode.INTERNAL, "Could not parse commit timestamp", e); - } catch (RuntimeException e) { - TraceUtil.endSpanWithFailure(span, e); - throw e; - } - } - - @Override - public ReadContext singleUse() { - return singleUse(TimestampBound.strong()); - } - - @Override - public ReadContext singleUse(TimestampBound bound) { - return setActive(new SingleReadContext(this, bound, gapicRpc, defaultPrefetchChunks)); - } - - @Override - public ReadOnlyTransaction singleUseReadOnlyTransaction() { - return singleUseReadOnlyTransaction(TimestampBound.strong()); - } - - @Override - public ReadOnlyTransaction singleUseReadOnlyTransaction(TimestampBound bound) { - return setActive( - new SingleUseReadOnlyTransaction(this, bound, gapicRpc, defaultPrefetchChunks)); - } - - @Override - public ReadOnlyTransaction readOnlyTransaction() { - return readOnlyTransaction(TimestampBound.strong()); - } - - @Override - public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) { - return setActive( - new MultiUseReadOnlyTransaction(this, bound, gapicRpc, defaultPrefetchChunks)); - } - - @Override - public TransactionRunner readWriteTransaction() { - return setActive(new TransactionRunnerImpl(this, gapicRpc, defaultPrefetchChunks)); - } - - @Override - public void prepareReadWriteTransaction() { - setActive(null); - readyTransactionId = beginTransaction(); - } - - @Override - public void close() { - Span span = tracer.spanBuilder(DELETE_SESSION).startSpan(); - try (Scope s = tracer.withSpan(span)) { - runWithRetries( - new Callable() { - @Override - public Void call() throws Exception { - gapicRpc.deleteSession(name, options); - return null; - } - }); - span.end(); - } catch (RuntimeException e) { - TraceUtil.endSpanWithFailure(span, e); - throw e; - } - } - - ByteString beginTransaction() { - Span span = tracer.spanBuilder(BEGIN_TRANSACTION).startSpan(); - try (Scope s = tracer.withSpan(span)) { - final BeginTransactionRequest request = - BeginTransactionRequest.newBuilder() - .setSession(name) - .setOptions( - TransactionOptions.newBuilder() - .setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance())) - .build(); - Transaction txn = - runWithRetries( - new Callable() { - @Override - public Transaction call() throws Exception { - return gapicRpc.beginTransaction(request, options); - } - }); - if (txn.getId().isEmpty()) { - throw newSpannerException(ErrorCode.INTERNAL, "Missing id in transaction\n" + getName()); - } - span.end(); - return txn.getId(); - } catch (RuntimeException e) { - TraceUtil.endSpanWithFailure(span, e); - throw e; - } - } - - TransactionContextImpl newTransaction() { - TransactionContextImpl txn = - new TransactionContextImpl(this, readyTransactionId, gapicRpc, defaultPrefetchChunks); - return txn; - } - - T setActive(@Nullable T ctx) { - throwIfTransactionsPending(); - - if (activeTransaction != null) { - activeTransaction.invalidate(); - } - activeTransaction = ctx; - readyTransactionId = null; - return ctx; - } - - @Override - public TransactionManager transactionManager() { - return new TransactionManagerImpl(this); - } - } - - /** - * Represents a transaction within a session. "Transaction" here is used in the general sense, - * which covers standalone reads, standalone writes, single-use and multi-use read-only - * transactions, and read-write transactions. The defining characteristic is that a session may - * only have one such transaction active at a time. - */ - static interface SessionTransaction { - /** Invalidates the transaction, generally because a new one has been started on the session. */ - void invalidate(); - } - private enum DirectExecutor implements Executor { INSTANCE; diff --git a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java index 540eaca2c062..42c20455a184 100644 --- a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java +++ b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java @@ -17,8 +17,7 @@ package com.google.cloud.spanner; import com.google.cloud.Timestamp; -import com.google.cloud.spanner.SpannerImpl.SessionImpl; -import com.google.cloud.spanner.SpannerImpl.SessionTransaction; +import com.google.cloud.spanner.SessionImpl.SessionTransaction; import com.google.common.base.Preconditions; import io.opencensus.common.Scope; import io.opencensus.trace.Span; diff --git a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java index de50fa083212..afe90d893f64 100644 --- a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java +++ b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java @@ -23,8 +23,7 @@ import com.google.api.client.util.BackOff; import com.google.cloud.Timestamp; -import com.google.cloud.spanner.SpannerImpl.SessionImpl; -import com.google.cloud.spanner.SpannerImpl.SessionTransaction; +import com.google.cloud.spanner.SessionImpl.SessionTransaction; import com.google.cloud.spanner.spi.v1.SpannerRpc; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; @@ -322,7 +321,7 @@ public TransactionRunner allowNestedTransaction() { public T run(TransactionCallable callable) { try (Scope s = tracer.withSpan(span)) { if (blockNestedTxn) { - SpannerImpl.hasPendingTransaction.set(Boolean.TRUE); + SessionImpl.hasPendingTransaction.set(Boolean.TRUE); } return runInternal(callable); @@ -333,7 +332,7 @@ public T run(TransactionCallable callable) { // Remove threadLocal rather than set to FALSE to avoid a possible memory leak. // We also do this unconditionally in case a user has modified the flag when the transaction // was running. - SpannerImpl.hasPendingTransaction.remove(); + SessionImpl.hasPendingTransaction.remove(); } } diff --git a/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java b/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java index 169d2177228b..919a0026ed31 100644 --- a/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java +++ b/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java @@ -20,6 +20,7 @@ import static org.junit.Assert.fail; import com.google.cloud.Timestamp; +import com.google.cloud.spanner.TransactionRunner.TransactionCallable; import com.google.cloud.spanner.spi.v1.SpannerRpc; import com.google.protobuf.ByteString; import com.google.protobuf.ListValue; @@ -66,9 +67,11 @@ public class SessionImplTest { @Captor private ArgumentCaptor> optionsCaptor; private Map options; + @SuppressWarnings("unchecked") @Before public void setUp() { MockitoAnnotations.initMocks(this); + @SuppressWarnings("resource") SpannerImpl spanner = new SpannerImpl(rpc, 1, spannerOptions); String dbName = "projects/p1/instances/i1/databases/d1"; String sessionName = dbName + "/sessions/s1"; @@ -81,10 +84,109 @@ public void setUp() { Mockito.anyMapOf(String.class, String.class), optionsCaptor.capture())) .thenReturn(sessionProto); + Transaction txn = Transaction.newBuilder().setId(ByteString.copyFromUtf8("TEST")).build(); + Mockito.when( + rpc.beginTransaction( + Mockito.any(BeginTransactionRequest.class), Mockito.any(Map.class))) + .thenReturn(txn); + CommitResponse commitResponse = + CommitResponse.newBuilder() + .setCommitTimestamp(com.google.protobuf.Timestamp.getDefaultInstance()) + .build(); + Mockito.when(rpc.commit(Mockito.any(CommitRequest.class), Mockito.any(Map.class))) + .thenReturn(commitResponse); session = spanner.createSession(db); // We expect the same options, "options", on all calls on "session". options = optionsCaptor.getValue(); - Mockito.reset(rpc); + } + + private void doNestedRwTransaction() { + session + .readWriteTransaction() + .run( + new TransactionCallable() { + @Override + public Void run(TransactionContext transaction) throws SpannerException { + session + .readWriteTransaction() + .run( + new TransactionCallable() { + @Override + public Void run(TransactionContext transaction) throws Exception { + return null; + } + }); + + return null; + } + }); + } + + @Test + public void nestedReadWriteTxnThrows() { + try { + doNestedRwTransaction(); + fail("Expected exception"); + } catch (SpannerException e) { + assertThat(e.getErrorCode()).isEqualTo(ErrorCode.INTERNAL); + assertThat(e.getMessage()).contains("not supported"); + } + } + + @Test + public void nestedReadOnlyTxnThrows() { + try { + session + .readWriteTransaction() + .run( + new TransactionCallable() { + @Override + public Void run(TransactionContext transaction) throws SpannerException { + session.readOnlyTransaction().getReadTimestamp(); + + return null; + } + }); + fail("Expected exception"); + } catch (SpannerException e) { + assertThat(e.getErrorCode()).isEqualTo(ErrorCode.INTERNAL); + assertThat(e.getMessage()).contains("not supported"); + } + } + + @Test + public void nestedSingleUseReadTxnThrows() { + try { + session + .readWriteTransaction() + .run( + new TransactionCallable() { + @Override + public Void run(TransactionContext transaction) throws SpannerException { + session.singleUseReadOnlyTransaction(); + return null; + } + }); + fail("Expected exception"); + } catch (SpannerException e) { + assertThat(e.getErrorCode()).isEqualTo(ErrorCode.INTERNAL); + assertThat(e.getMessage()).contains("not supported"); + } + } + + @Test + public void nestedTxnSucceedsWhenAllowed() { + session + .readWriteTransaction() + .allowNestedTransaction() + .run( + new TransactionCallable() { + @Override + public Void run(TransactionContext transaction) throws SpannerException { + session.singleUseReadOnlyTransaction(); + return null; + } + }); } @Test diff --git a/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerImplTest.java b/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerImplTest.java index b12c7238ebf4..1d7904fd0784 100644 --- a/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerImplTest.java +++ b/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerImplTest.java @@ -24,7 +24,6 @@ import static org.mockito.MockitoAnnotations.initMocks; import com.google.cloud.Timestamp; -import com.google.cloud.spanner.SpannerImpl.SessionImpl; import com.google.cloud.spanner.TransactionManager.TransactionState; import org.junit.Before; import org.junit.Rule; diff --git a/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java b/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java index cf9350a5e812..2de9882187f2 100644 --- a/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java +++ b/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java @@ -43,7 +43,7 @@ @RunWith(JUnit4.class) public class TransactionRunnerImplTest { @Mock private SpannerRpc rpc; - @Mock private SpannerImpl.SessionImpl session; + @Mock private SessionImpl session; @Mock private TransactionRunnerImpl.Sleeper sleeper; @Mock private TransactionRunnerImpl.TransactionContextImpl txn; private TransactionRunnerImpl transactionRunner;