Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
fabbb15
More fixes
vkedia Nov 17, 2017
3d40e30
Fixed findbugs version
vkedia Nov 29, 2017
14fb803
Merge branch 'master' into tracing
vkedia Nov 29, 2017
b62dee7
Add tracing to cloud spanner client
vkedia Dec 1, 2017
027e3fb
Fix version of findbugs
vkedia Dec 2, 2017
2804e1b
Made changes per code review comments
vkedia Dec 16, 2017
b118cf2
Merge remote-tracking branch 'upstream/master' into tracing
vkedia Dec 16, 2017
9d06238
Fixed dependencies
vkedia Dec 16, 2017
01c1085
Fixes style issues
vkedia Dec 16, 2017
c2c6240
Removed empty spaces
vkedia Dec 16, 2017
8fe2ba2
Fixed NPE
vkedia Dec 18, 2017
37a54fb
Handles the case of null SampledSpanStore
vkedia Dec 19, 2017
b369d9e
Changed error_prone version to 2.1.2
vkedia Dec 20, 2017
b279588
Changed opencensus version to 0.9.1
vkedia Dec 20, 2017
755bf5d
Reverts the change to eclipse prefs file
vkedia Dec 20, 2017
4fdd48a
Merge remote-tracking branch 'upstream/master' into tracing
vkedia Dec 20, 2017
705c773
Merge remote-tracking branch 'upstream/master' into txn-api
vkedia Jan 11, 2018
74bff14
Interface for manual transaction manager
vkedia Jan 12, 2018
9dc4df9
More updates
vkedia Feb 27, 2018
7b0b2e0
Merge remote-tracking branch 'upstream/master' into txn-api
vkedia Mar 8, 2018
be0e510
Tests for transaction manager.
vkedia Apr 12, 2018
1ce2470
Merge remote-tracking branch 'upstream/master' into txn-api
vkedia Apr 12, 2018
7ca9a0f
Indentation fix
vkedia Apr 12, 2018
899d3df
Fixes TransactionRunner tests and adds 2 more tests
vkedia Apr 12, 2018
e01a554
Addresses codacy comments
vkedia Apr 12, 2018
e15694b
Added snippets in Javadoc
vkedia Apr 13, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@
package com.google.cloud.examples.spanner.snippets;

import com.google.cloud.Timestamp;
import com.google.cloud.spanner.AbortedException;
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.Key;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.ReadOnlyTransaction;
import com.google.cloud.spanner.Struct;
import com.google.cloud.spanner.TimestampBound;
import com.google.cloud.spanner.TransactionContext;
import com.google.cloud.spanner.TransactionManager;
import com.google.cloud.spanner.TransactionRunner;
import com.google.cloud.spanner.TransactionRunner.TransactionCallable;
import java.util.Collections;
Expand Down Expand Up @@ -222,4 +224,31 @@ public Void run(TransactionContext transaction) throws Exception {
});
// [END readWriteTransaction]
}

/**
* Example of using {@link TransactionManager}.
*/
// [TARGET transactionManager()]
// [VARIABLE my_singer_id]
public void transactionManager(final long singerId) throws InterruptedException {
// [START transactionManager]
try (TransactionManager manager = dbClient.transactionManager()) {
TransactionContext txn = manager.begin();
while (true) {
String column = "FirstName";
Struct row = txn.readRow("Singers", Key.of(singerId), Collections.singleton(column));
String name = row.getString(column);
txn.buffer(
Mutation.newUpdateBuilder("Singers").set(column).to(name.toUpperCase()).build());
try {
manager.commit();
break;
} catch (AbortedException e) {
Thread.sleep(e.getRetryDelayInMillis() / 1000);
txn = manager.resetForRetry();
}
}
}
// [END transactionManager]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,56 @@ public interface DatabaseClient {
* });
* </code></pre>
*
* <p>Example of a read write transaction.
* <pre> {@code
* long singerId = my_singer_id;
* TransactionRunner runner = dbClient.readWriteTransaction();
* runner.run(
* new TransactionCallable<Void>() {
*
* @Override
* public Void run(TransactionContext transaction) throws Exception {
* String column = "FirstName";
* Struct row =
* transaction.readRow("Singers", Key.of(singerId), Collections.singleton(column));
* String name = row.getString(column);
* transaction.buffer(
* Mutation.newUpdateBuilder("Singers").set(column).to(name.toUpperCase()).build());
* return null;
* }
* });
* }</pre>
*
*/
TransactionRunner readWriteTransaction();

/**
* Returns a transaction manager which allows manual management of transaction lifecycle. This
* API is meant for advanced users. Most users should instead use the
* {@link #readWriteTransaction()} API instead.
*
* <p>Example of using {@link TransactionManager}.
* <pre> {@code
* long singerId = my_singer_id;
* try (TransactionManager manager = dbClient.transactionManager()) {
* TransactionContext txn = manager.begin();
* while (true) {
* String column = "FirstName";
* Struct row = txn.readRow("Singers", Key.of(singerId), Collections.singleton(column));
* String name = row.getString(column);
* txn.buffer(
* Mutation.newUpdateBuilder("Singers").set(column).to(name.toUpperCase()).build());
* try {
* manager.commit();
* break;
* } catch (AbortedException e) {
* Thread.sleep(e.getRetryDelayInMillis() / 1000);
* txn = manager.resetForRetry();
* }
* }
* }
* }</pre>
*
*/
TransactionManager transactionManager();
}
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,17 @@ public TransactionRunner readWriteTransaction() {
}
}

@Override
public TransactionManager transactionManager() {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return pool.getReadWriteSession().transactionManager();
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
}
}

ListenableFuture<Void> closeAsync() {
return pool.closeAsync();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,65 @@ public Timestamp getReadTimestamp() {
return txn.getReadTimestamp();
}
}

private static class AutoClosingTransactionManager implements TransactionManager {
final TransactionManager delegate;
final PooledSession session;

AutoClosingTransactionManager(TransactionManager delegate, PooledSession session) {
this.delegate = delegate;
this.session = session;
}

@Override
public TransactionContext begin() {
return delegate.begin();
}

@Override
public void commit() {
try {
delegate.commit();
} finally {
if (getState() != TransactionState.ABORTED) {
close();
}
}
}

@Override
public void rollback() {
try {
delegate.rollback();
} finally {
close();
}
}

@Override
public TransactionContext resetForRetry() {
return delegate.resetForRetry();
}

@Override
public Timestamp getCommitTimestamp() {
return delegate.getCommitTimestamp();
}

@Override
public void close() {
try {
delegate.close();
} finally {
session.close();
}
}

@Override
public TransactionState getState() {
return delegate.getState();
}
}

// Exception class used just to track the stack trace at the point when a session was handed out
// from the pool.
Expand Down Expand Up @@ -386,6 +445,12 @@ private void keepAlive() {
private void markUsed() {
lastUseTime = clock.instant();
}

@Override
public TransactionManager transactionManager() {
markUsed();
return new AutoClosingTransactionManager(delegate.transactionManager(), this);
}
}

private static final class SessionOrError {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -898,14 +898,25 @@ public Transaction call() throws Exception {
}
}

private <T extends SessionTransaction> T setActive(@Nullable T ctx) {
TransactionContextImpl newTransaction() {
TransactionContextImpl txn = new TransactionContextImpl(this, readyTransactionId, rpc,
defaultPrefetchChunks);
return txn;
}

<T extends SessionTransaction> T setActive(@Nullable T ctx) {
if (activeTransaction != null) {
activeTransaction.invalidate();
}
activeTransaction = ctx;
readyTransactionId = null;
return ctx;
}

@Override
public TransactionManager transactionManager() {
return new TransactionManagerImpl(this);
}
}

/**
Expand All @@ -914,7 +925,7 @@ private <T extends SessionTransaction> T setActive(@Nullable T ctx) {
* transactions, and read-write transactions. The defining characteristic is that a session may
* only have one such transaction active at a time.
*/
private interface SessionTransaction {
static interface SessionTransaction {
/** Invalidates the transaction, generally because a new one has been started on the session. */
void invalidate();
}
Expand Down Expand Up @@ -1018,7 +1029,7 @@ ResultSet executeQueryInternalWithOptions(
ExecuteSqlRequest.newBuilder()
.setSql(statement.getSql())
.setQueryMode(queryMode)
.setSession(session.name);
.setSession(session.getName());
Map<String, Value> stmtParameters = statement.getParameters();
if (!stmtParameters.isEmpty()) {
com.google.protobuf.Struct.Builder paramsBuilder = builder.getParamsBuilder();
Expand Down Expand Up @@ -1217,10 +1228,7 @@ void backoffSleep(Context context, long backoffMillis) {
this.session = session;
this.sleeper = sleeper;
this.span = Tracing.getTracer().getCurrentSpan();
ByteString transactionId = session.readyTransactionId;
session.readyTransactionId = null;
this.txn = new TransactionContextImpl(session, transactionId, rpc, defaultPrefetchChunks,
span);
this.txn = session.newTransaction();
}

TransactionRunnerImpl(SessionImpl session, SpannerRpc rpc, int defaultPrefetchChunks) {
Expand All @@ -1230,7 +1238,7 @@ void backoffSleep(Context context, long backoffMillis) {
@Nullable
@Override
public <T> T run(TransactionCallable<T> callable) {
try {
try (Scope s = tracer.withSpan(span)) {
return runInternal(callable);
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
Expand All @@ -1244,6 +1252,7 @@ private <T> T runInternal(TransactionCallable<T> callable) {
BackOff backoff = newBackOff();
final Context context = Context.current();
int attempt = 0;
// TODO: Change this to use TransactionManager.
while (true) {
checkState(
isValid, "TransactionRunner has been invalidated by a new operation on the session");
Expand Down Expand Up @@ -1318,7 +1327,7 @@ public void invalidate() {

private void backoff(Context context, BackOff backoff) {
long delay = txn.getRetryDelayInMillis(backoff);
txn = new TransactionContextImpl(session, null, txn.rpc, txn.defaultPrefetchChunks, span);
txn = session.newTransaction();
span.addAnnotation("Backing off",
ImmutableMap.of("Delay", AttributeValue.longAttributeValue(delay)));
sleeper.backoffSleep(context, delay);
Expand All @@ -1344,9 +1353,8 @@ static class TransactionContextImpl extends AbstractReadContext implements Trans
SessionImpl session,
@Nullable ByteString transactionId,
SpannerRpc rpc,
int defaultPrefetchChunks,
Span span) {
super(session, rpc, defaultPrefetchChunks, span);
int defaultPrefetchChunks) {
super(session, rpc, defaultPrefetchChunks);
this.transactionId = transactionId;
}

Expand Down
Loading