Skip to content
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
fabbb15
More fixes
vkedia Nov 17, 2017
3d40e30
Fixed findbugs version
vkedia Nov 29, 2017
14fb803
Merge branch 'master' into tracing
vkedia Nov 29, 2017
b62dee7
Add tracing to cloud spanner client
vkedia Dec 1, 2017
027e3fb
Fix version of findbugs
vkedia Dec 2, 2017
2804e1b
Made changes per code review comments
vkedia Dec 16, 2017
b118cf2
Merge remote-tracking branch 'upstream/master' into tracing
vkedia Dec 16, 2017
9d06238
Fixed dependencies
vkedia Dec 16, 2017
01c1085
Fixes style issues
vkedia Dec 16, 2017
c2c6240
Removed empty spaces
vkedia Dec 16, 2017
8fe2ba2
Fixed NPE
vkedia Dec 18, 2017
37a54fb
Handles the case of null SampledSpanStore
vkedia Dec 19, 2017
b369d9e
Changed error_prone version to 2.1.2
vkedia Dec 20, 2017
b279588
Changed opencensus version to 0.9.1
vkedia Dec 20, 2017
755bf5d
Reverts the change to eclipse prefs file
vkedia Dec 20, 2017
4fdd48a
Merge remote-tracking branch 'upstream/master' into tracing
vkedia Dec 20, 2017
705c773
Merge remote-tracking branch 'upstream/master' into txn-api
vkedia Jan 11, 2018
74bff14
Interface for manual transaction manager
vkedia Jan 12, 2018
9dc4df9
More updates
vkedia Feb 27, 2018
7b0b2e0
Merge remote-tracking branch 'upstream/master' into txn-api
vkedia Mar 8, 2018
be0e510
Tests for transaction manager.
vkedia Apr 12, 2018
1ce2470
Merge remote-tracking branch 'upstream/master' into txn-api
vkedia Apr 12, 2018
7ca9a0f
Indentation fix
vkedia Apr 12, 2018
899d3df
Fixes TransactionRunner tests and adds 2 more tests
vkedia Apr 12, 2018
e01a554
Addresses codacy comments
vkedia Apr 12, 2018
e15694b
Added snippets in Javadoc
vkedia Apr 13, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@
package com.google.cloud.examples.spanner.snippets;

import com.google.cloud.Timestamp;
import com.google.cloud.spanner.AbortedException;
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.Key;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.ReadOnlyTransaction;
import com.google.cloud.spanner.Struct;
import com.google.cloud.spanner.TimestampBound;
import com.google.cloud.spanner.TransactionContext;
import com.google.cloud.spanner.TransactionManager;
import com.google.cloud.spanner.TransactionRunner;
import com.google.cloud.spanner.TransactionRunner.TransactionCallable;
import java.util.Collections;
Expand Down Expand Up @@ -222,4 +224,31 @@ public Void run(TransactionContext transaction) throws Exception {
});
// [END readWriteTransaction]
}

/**
* Example of using {@link TransactionManager}.
*/
// [TARGET transactionManager()]
// [VARIABLE my_singer_id]
public void transactionManager(final long singerId) throws InterruptedException {
// [START transactionManager]
try (TransactionManager manager = dbClient.transactionManager()) {
TransactionContext txn = manager.begin();
while (true) {
String column = "FirstName";
Struct row = txn.readRow("Singers", Key.of(singerId), Collections.singleton(column));
String name = row.getString(column);
txn.buffer(
Mutation.newUpdateBuilder("Singers").set(column).to(name.toUpperCase()).build());
try {
manager.commit();
break;
} catch (AbortedException e) {
Thread.sleep(e.getRetryDelayInMillis() / 1000);
txn = manager.resetForRetry();
}
}
}
// [END transactionManager]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -239,4 +239,11 @@ public interface DatabaseClient {
*
*/
TransactionRunner readWriteTransaction();

/**
* Returns a transaction manager which allows manual management of transaction lifecycle. This
* API is meant for advanced users. Most users should instead use the
* {@link #readWriteTransaction()} API instead.
*/
TransactionManager transactionManager();

This comment was marked as spam.

}
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,17 @@ public TransactionRunner readWriteTransaction() {
}
}

@Override
public TransactionManager transactionManager() {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return pool.getReadWriteSession().transactionManager();
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
}
}

ListenableFuture<Void> closeAsync() {
return pool.closeAsync();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,65 @@ public Timestamp getReadTimestamp() {
return txn.getReadTimestamp();
}
}

private static class AutoClosingTransactionManager implements TransactionManager {
final TransactionManager delegate;
final PooledSession session;

AutoClosingTransactionManager(TransactionManager delegate, PooledSession session) {
this.delegate = delegate;
this.session = session;
}

@Override
public TransactionContext begin() {
return delegate.begin();
}

@Override
public void commit() {
try {
delegate.commit();
} finally {
if (getState() != TransactionState.ABORTED) {
close();
}
}
}

@Override
public void rollback() {
try {
delegate.rollback();
} finally {
close();
}
}

@Override
public TransactionContext resetForRetry() {
return delegate.resetForRetry();
}

@Override
public Timestamp getCommitTimestamp() {
return delegate.getCommitTimestamp();
}

@Override
public void close() {
try {
delegate.close();
} finally {
session.close();
}
}

@Override
public TransactionState getState() {
return delegate.getState();
}
}

// Exception class used just to track the stack trace at the point when a session was handed out
// from the pool.
Expand Down Expand Up @@ -386,6 +445,12 @@ private void keepAlive() {
private void markUsed() {
lastUseTime = clock.instant();
}

@Override
public TransactionManager transactionManager() {
markUsed();
return new AutoClosingTransactionManager(delegate.transactionManager(), this);
}
}

private static final class SessionOrError {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -898,14 +898,25 @@ public Transaction call() throws Exception {
}
}

private <T extends SessionTransaction> T setActive(@Nullable T ctx) {
TransactionContextImpl newTransaction() {
TransactionContextImpl txn = new TransactionContextImpl(this, readyTransactionId, rpc,
defaultPrefetchChunks);
return txn;
}

<T extends SessionTransaction> T setActive(@Nullable T ctx) {
if (activeTransaction != null) {
activeTransaction.invalidate();
}
activeTransaction = ctx;
readyTransactionId = null;
return ctx;
}

@Override
public TransactionManager transactionManager() {
return new TransactionManagerImpl(this);
}
}

/**
Expand All @@ -914,7 +925,7 @@ private <T extends SessionTransaction> T setActive(@Nullable T ctx) {
* transactions, and read-write transactions. The defining characteristic is that a session may
* only have one such transaction active at a time.
*/
private interface SessionTransaction {
static interface SessionTransaction {
/** Invalidates the transaction, generally because a new one has been started on the session. */
void invalidate();
}
Expand Down Expand Up @@ -1018,7 +1029,7 @@ ResultSet executeQueryInternalWithOptions(
ExecuteSqlRequest.newBuilder()
.setSql(statement.getSql())
.setQueryMode(queryMode)
.setSession(session.name);
.setSession(session.getName());
Map<String, Value> stmtParameters = statement.getParameters();
if (!stmtParameters.isEmpty()) {
com.google.protobuf.Struct.Builder paramsBuilder = builder.getParamsBuilder();
Expand Down Expand Up @@ -1217,10 +1228,7 @@ void backoffSleep(Context context, long backoffMillis) {
this.session = session;
this.sleeper = sleeper;
this.span = Tracing.getTracer().getCurrentSpan();
ByteString transactionId = session.readyTransactionId;
session.readyTransactionId = null;
this.txn = new TransactionContextImpl(session, transactionId, rpc, defaultPrefetchChunks,
span);
this.txn = session.newTransaction();
}

TransactionRunnerImpl(SessionImpl session, SpannerRpc rpc, int defaultPrefetchChunks) {
Expand All @@ -1230,7 +1238,7 @@ void backoffSleep(Context context, long backoffMillis) {
@Nullable
@Override
public <T> T run(TransactionCallable<T> callable) {
try {
try (Scope s = tracer.withSpan(span)) {
return runInternal(callable);
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
Expand Down Expand Up @@ -1318,7 +1326,7 @@ public void invalidate() {

private void backoff(Context context, BackOff backoff) {
long delay = txn.getRetryDelayInMillis(backoff);
txn = new TransactionContextImpl(session, null, txn.rpc, txn.defaultPrefetchChunks, span);
txn = session.newTransaction();
span.addAnnotation("Backing off",
ImmutableMap.of("Delay", AttributeValue.longAttributeValue(delay)));
sleeper.backoffSleep(context, delay);
Expand All @@ -1344,9 +1352,8 @@ static class TransactionContextImpl extends AbstractReadContext implements Trans
SessionImpl session,
@Nullable ByteString transactionId,
SpannerRpc rpc,
int defaultPrefetchChunks,
Span span) {
super(session, rpc, defaultPrefetchChunks, span);
int defaultPrefetchChunks) {
super(session, rpc, defaultPrefetchChunks);
this.transactionId = transactionId;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright 2017 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.spanner;

import com.google.cloud.Timestamp;

/**
* An interface for managing the life cycle of a read write transaction including all its retries.
* See {@link TransactionContext} for a description of transaction semantics.
*
* <p>At any point in time there can be at most one active transaction in this manager. When that
* transaction is committed, if it fails with an {@code ABORTED} error, calling
* {@link #resetForRetry()} would create a new {@link TransactionContext}. The newly created
* transaction would use the same session thus increasing its lock priority. If the transaction is
* committed successfully, or is rolled back or commit fails with any error other than
* {@code ABORTED}, the manager is considered complete and no further transactions are allowed to be
* created in it.
*
* <p>Every {@code TransactionManager} should either be committed or rolled back. Failure to do so
* can cause resources to be leaked and deadlocks. Easiest way to guarantee this is by calling
* {@link #close()} in a finally block.
*
* @see DatabaseClient#transactionManager()
*/
public interface TransactionManager extends AutoCloseable {

/**
* State of the transaction manager.
*/
public enum TransactionState {
// Transaction has been started either by calling {@link #begin()} or via
// {@link resetForRetry()} but has not been commited or rolled back yet.
STARTED,
// Transaction was sucessfully committed. This is a terminal state.
COMMITTED,
// Transaction failed during commit with an error other than ABORTED. Transaction cannot be
// retried in this state. This is a terminal state.
COMMIT_FAILED,
// Transaction failed during commit with ABORTED and can be retried.
ABORTED,
// Transaction was rolled back. This is a terminal state.
ROLLED_BACK
}

/**
* Creates a new read write transaction. This must be called before doing any other operation and
* can only be called once. To create a new transaction for subsequent retries, see
* {@link #resetForRetry()}.
*/
TransactionContext begin();

/**
* Commits the currently active transaction. If the transaction was already aborted, then this
* would throw an {@link AbortedException}.
*/
void commit();

/**
* Rolls back the currently active transaction. In most cases there should be no need to call this
* explicitly since {@link #close()} would automatically roll back any active transaction.
*/
void rollback();

/**
* Creates a new transaction for retry. This should only be called if the previous transaction
* failed with {@code ABORTED}. In all other cases, this will throw an
* {@link IllegalStateException}. Users should backoff before calling this method. Backoff delay
* is specified by {@link SpannerException#getRetryDelayInMillis()} on the
* {@code SpannerException} throw by the previous commit call.
*/
TransactionContext resetForRetry();

This comment was marked as spam.

This comment was marked as spam.


/**
* Returns the commit timestamp if the transaction committed successfully otherwise it will throw
* {@code IllegalStateException}.
*/
Timestamp getCommitTimestamp();

/**
* Returns the state of the transaction.
*/
TransactionState getState();

/**
* Closes the manager. If there is an active transaction, it will be rolled back. Underlying
* session will be released back to the session pool.
*/
@Override
void close();
}
Loading