Skip to content

Commit be7ece5

Browse files
olavloitekolea2
authored andcommitted
Spanner: Refactor SpannerImpl - Move SessionImpl to separate file (#4895)
* refactor SpannerImpl: move SessionImpl to separate file * fix pending transactions flag * fixed merge conflicts * changed references after rebase on master
1 parent 262b5af commit be7ece5

10 files changed

Lines changed: 402 additions & 251 deletions

File tree

google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,7 @@
2828
import com.google.cloud.spanner.AbstractResultSet.ResumableStreamIterator;
2929
import com.google.cloud.spanner.Options.QueryOption;
3030
import com.google.cloud.spanner.Options.ReadOption;
31-
import com.google.cloud.spanner.SpannerImpl.SessionImpl;
32-
import com.google.cloud.spanner.SpannerImpl.SessionTransaction;
31+
import com.google.cloud.spanner.SessionImpl.SessionTransaction;
3332
import com.google.cloud.spanner.spi.v1.SpannerRpc;
3433
import com.google.protobuf.ByteString;
3534
import com.google.spanner.v1.BeginTransactionRequest;
@@ -205,7 +204,7 @@ ByteString getTransactionId() {
205204
}
206205

207206
void initTransaction() {
208-
SpannerImpl.throwIfTransactionsPending();
207+
SessionImpl.throwIfTransactionsPending();
209208

210209
// Since we only support synchronous calls, just block on "txnLock" while the RPC is in
211210
// flight. Note that we use the strategy of sending an explicit BeginTransaction() RPC,

google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchClientImpl.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import com.google.cloud.spanner.AbstractReadContext.MultiUseReadOnlyTransaction;
2222
import com.google.cloud.spanner.Options.QueryOption;
2323
import com.google.cloud.spanner.Options.ReadOption;
24-
import com.google.cloud.spanner.SpannerImpl.SessionImpl;
2524
import com.google.cloud.spanner.spi.v1.SpannerRpc;
2625
import com.google.common.base.Preconditions;
2726
import com.google.common.collect.ImmutableList;

google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDMLTransaction.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,7 @@
1818

1919
import static com.google.common.base.Preconditions.checkState;
2020

21-
import com.google.cloud.spanner.SpannerImpl.SessionImpl;
22-
import com.google.cloud.spanner.SpannerImpl.SessionTransaction;
21+
import com.google.cloud.spanner.SessionImpl.SessionTransaction;
2322
import com.google.cloud.spanner.spi.v1.SpannerRpc;
2423
import com.google.protobuf.ByteString;
2524
import com.google.spanner.v1.BeginTransactionRequest;
Lines changed: 277 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,277 @@
1+
/*
2+
* Copyright 2019 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.spanner;
18+
19+
import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException;
20+
import static com.google.common.base.Preconditions.checkNotNull;
21+
22+
import com.google.cloud.Timestamp;
23+
import com.google.cloud.spanner.AbstractReadContext.MultiUseReadOnlyTransaction;
24+
import com.google.cloud.spanner.AbstractReadContext.SingleReadContext;
25+
import com.google.cloud.spanner.AbstractReadContext.SingleUseReadOnlyTransaction;
26+
import com.google.cloud.spanner.TransactionRunnerImpl.TransactionContextImpl;
27+
import com.google.cloud.spanner.spi.v1.SpannerRpc;
28+
import com.google.common.collect.Lists;
29+
import com.google.protobuf.ByteString;
30+
import com.google.spanner.v1.BeginTransactionRequest;
31+
import com.google.spanner.v1.CommitRequest;
32+
import com.google.spanner.v1.CommitResponse;
33+
import com.google.spanner.v1.Transaction;
34+
import com.google.spanner.v1.TransactionOptions;
35+
import io.opencensus.common.Scope;
36+
import io.opencensus.trace.Span;
37+
import io.opencensus.trace.Tracer;
38+
import io.opencensus.trace.Tracing;
39+
import java.util.ArrayList;
40+
import java.util.Collection;
41+
import java.util.List;
42+
import java.util.Map;
43+
import java.util.concurrent.Callable;
44+
import javax.annotation.Nullable;
45+
46+
/**
47+
* Implementation of {@link Session}. Sessions are managed internally by the client library, and
48+
* users need not be aware of the actual session management, pooling and handling.
49+
*/
50+
class SessionImpl implements Session {
51+
private static final Tracer tracer = Tracing.getTracer();
52+
53+
/** Keep track of running transactions on this session per thread. */
54+
static final ThreadLocal<Boolean> hasPendingTransaction =
55+
new ThreadLocal<Boolean>() {
56+
@Override
57+
protected Boolean initialValue() {
58+
return false;
59+
}
60+
};
61+
62+
static void throwIfTransactionsPending() {
63+
if (hasPendingTransaction.get() == Boolean.TRUE) {
64+
throw newSpannerException(ErrorCode.INTERNAL, "Nested transactions are not supported");
65+
}
66+
}
67+
68+
/**
69+
* Represents a transaction within a session. "Transaction" here is used in the general sense,
70+
* which covers standalone reads, standalone writes, single-use and multi-use read-only
71+
* transactions, and read-write transactions. The defining characteristic is that a session may
72+
* only have one such transaction active at a time.
73+
*/
74+
static interface SessionTransaction {
75+
/** Invalidates the transaction, generally because a new one has been started on the session. */
76+
void invalidate();
77+
}
78+
79+
private final SpannerImpl spanner;
80+
private final String name;
81+
private SessionTransaction activeTransaction;
82+
private ByteString readyTransactionId;
83+
private final Map<SpannerRpc.Option, ?> options;
84+
85+
SessionImpl(SpannerImpl spanner, String name, Map<SpannerRpc.Option, ?> options) {
86+
this.spanner = spanner;
87+
this.options = options;
88+
this.name = checkNotNull(name);
89+
}
90+
91+
@Override
92+
public String getName() {
93+
return name;
94+
}
95+
96+
Map<SpannerRpc.Option, ?> getOptions() {
97+
return options;
98+
}
99+
100+
@Override
101+
public long executePartitionedUpdate(Statement stmt) {
102+
setActive(null);
103+
PartitionedDMLTransaction txn = new PartitionedDMLTransaction(this, spanner.getRpc());
104+
return txn.executePartitionedUpdate(stmt);
105+
}
106+
107+
@Override
108+
public Timestamp write(Iterable<Mutation> mutations) throws SpannerException {
109+
TransactionRunner runner = readWriteTransaction();
110+
final Collection<Mutation> finalMutations =
111+
mutations instanceof java.util.Collection<?>
112+
? (Collection<Mutation>) mutations
113+
: Lists.newArrayList(mutations);
114+
runner.run(
115+
new TransactionRunner.TransactionCallable<Void>() {
116+
@Override
117+
public Void run(TransactionContext ctx) {
118+
ctx.buffer(finalMutations);
119+
return null;
120+
}
121+
});
122+
return runner.getCommitTimestamp();
123+
}
124+
125+
@Override
126+
public Timestamp writeAtLeastOnce(Iterable<Mutation> mutations) throws SpannerException {
127+
setActive(null);
128+
List<com.google.spanner.v1.Mutation> mutationsProto = new ArrayList<>();
129+
Mutation.toProto(mutations, mutationsProto);
130+
final CommitRequest request =
131+
CommitRequest.newBuilder()
132+
.setSession(name)
133+
.addAllMutations(mutationsProto)
134+
.setSingleUseTransaction(
135+
TransactionOptions.newBuilder()
136+
.setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance()))
137+
.build();
138+
Span span = tracer.spanBuilder(SpannerImpl.COMMIT).startSpan();
139+
try (Scope s = tracer.withSpan(span)) {
140+
CommitResponse response =
141+
SpannerImpl.runWithRetries(
142+
new Callable<CommitResponse>() {
143+
@Override
144+
public CommitResponse call() throws Exception {
145+
return spanner.getRpc().commit(request, options);
146+
}
147+
});
148+
Timestamp t = Timestamp.fromProto(response.getCommitTimestamp());
149+
span.end();
150+
return t;
151+
} catch (IllegalArgumentException e) {
152+
TraceUtil.endSpanWithFailure(span, e);
153+
throw newSpannerException(ErrorCode.INTERNAL, "Could not parse commit timestamp", e);
154+
} catch (RuntimeException e) {
155+
TraceUtil.endSpanWithFailure(span, e);
156+
throw e;
157+
}
158+
}
159+
160+
@Override
161+
public ReadContext singleUse() {
162+
return singleUse(TimestampBound.strong());
163+
}
164+
165+
@Override
166+
public ReadContext singleUse(TimestampBound bound) {
167+
return setActive(
168+
new SingleReadContext(this, bound, spanner.getRpc(), spanner.getDefaultPrefetchChunks()));
169+
}
170+
171+
@Override
172+
public ReadOnlyTransaction singleUseReadOnlyTransaction() {
173+
return singleUseReadOnlyTransaction(TimestampBound.strong());
174+
}
175+
176+
@Override
177+
public ReadOnlyTransaction singleUseReadOnlyTransaction(TimestampBound bound) {
178+
return setActive(
179+
new SingleUseReadOnlyTransaction(
180+
this, bound, spanner.getRpc(), spanner.getDefaultPrefetchChunks()));
181+
}
182+
183+
@Override
184+
public ReadOnlyTransaction readOnlyTransaction() {
185+
return readOnlyTransaction(TimestampBound.strong());
186+
}
187+
188+
@Override
189+
public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) {
190+
return setActive(
191+
new MultiUseReadOnlyTransaction(
192+
this, bound, spanner.getRpc(), spanner.getDefaultPrefetchChunks()));
193+
}
194+
195+
@Override
196+
public TransactionRunner readWriteTransaction() {
197+
return setActive(
198+
new TransactionRunnerImpl(this, spanner.getRpc(), spanner.getDefaultPrefetchChunks()));
199+
}
200+
201+
@Override
202+
public void prepareReadWriteTransaction() {
203+
setActive(null);
204+
readyTransactionId = beginTransaction();
205+
}
206+
207+
@Override
208+
public void close() {
209+
Span span = tracer.spanBuilder(SpannerImpl.DELETE_SESSION).startSpan();
210+
try (Scope s = tracer.withSpan(span)) {
211+
SpannerImpl.runWithRetries(
212+
new Callable<Void>() {
213+
@Override
214+
public Void call() throws Exception {
215+
spanner.getRpc().deleteSession(name, options);
216+
return null;
217+
}
218+
});
219+
span.end();
220+
} catch (RuntimeException e) {
221+
TraceUtil.endSpanWithFailure(span, e);
222+
throw e;
223+
}
224+
}
225+
226+
ByteString beginTransaction() {
227+
Span span = tracer.spanBuilder(SpannerImpl.BEGIN_TRANSACTION).startSpan();
228+
try (Scope s = tracer.withSpan(span)) {
229+
final BeginTransactionRequest request =
230+
BeginTransactionRequest.newBuilder()
231+
.setSession(name)
232+
.setOptions(
233+
TransactionOptions.newBuilder()
234+
.setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance()))
235+
.build();
236+
Transaction txn =
237+
SpannerImpl.runWithRetries(
238+
new Callable<Transaction>() {
239+
@Override
240+
public Transaction call() throws Exception {
241+
return spanner.getRpc().beginTransaction(request, options);
242+
}
243+
});
244+
if (txn.getId().isEmpty()) {
245+
throw newSpannerException(ErrorCode.INTERNAL, "Missing id in transaction\n" + getName());
246+
}
247+
span.end();
248+
return txn.getId();
249+
} catch (RuntimeException e) {
250+
TraceUtil.endSpanWithFailure(span, e);
251+
throw e;
252+
}
253+
}
254+
255+
TransactionContextImpl newTransaction() {
256+
TransactionContextImpl txn =
257+
new TransactionContextImpl(
258+
this, readyTransactionId, spanner.getRpc(), spanner.getDefaultPrefetchChunks());
259+
return txn;
260+
}
261+
262+
<T extends SessionTransaction> T setActive(@Nullable T ctx) {
263+
throwIfTransactionsPending();
264+
265+
if (activeTransaction != null) {
266+
activeTransaction.invalidate();
267+
}
268+
activeTransaction = ctx;
269+
readyTransactionId = null;
270+
return ctx;
271+
}
272+
273+
@Override
274+
public TransactionManager transactionManager() {
275+
return new TransactionManagerImpl(this);
276+
}
277+
}

0 commit comments

Comments
 (0)