Skip to content

Commit c8734b9

Browse files
authored
chore: allow tx opts to be supplied in a callback (#12779)
The database/sql driver uses inline-begin transaction by default for all transaction shapes. It also allows setting transaction options after the transaction has been started, but not yet activated. For example the following script is allowed: ``` BEGIN TRANSACTION; SET ISOLATION_LEVEL='repeatable_read'; UPDATE my_table SET my_col=1 WHERE id=1; COMMIT; ``` The actual start of the transaction is the UPDATE statement. The isolation level is set after the BEGIN TRANSACTION statement, and should be applied to the current transaction. In order to enable this, statement-based transactions should allow a callback to be registered that will be called when the transaction is actually started. This callback should return any updated transaction options since the BEGIN TRANSACTION call.
1 parent 7f574b0 commit c8734b9

File tree

3 files changed

+85
-10
lines changed

3 files changed

+85
-10
lines changed

spanner/client_test.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6782,6 +6782,42 @@ func TestClient_BatchWriteExcludeTxnFromChangeStreams(t *testing.T) {
67826782
}
67836783
}
67846784

6785+
func TestClient_DelayedExcludeTxnFromChangeStreams(t *testing.T) {
6786+
server, client, teardown := setupMockedTestServer(t)
6787+
defer teardown()
6788+
ctx := context.Background()
6789+
6790+
ms := []*Mutation{
6791+
Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}),
6792+
Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}),
6793+
}
6794+
6795+
// Randomly exclude the transaction from change streams.
6796+
exclude := time.Now().UnixNano()%2 == 0
6797+
// Start a transaction without an ExcludeTxnFromChangeStreams option, but register a callback that allows us to set
6798+
// it right before the transaction is actually started.
6799+
tx, err := NewReadWriteStmtBasedTransactionWithCallbackForOptions(ctx, client, TransactionOptions{}, func() TransactionOptions {
6800+
return TransactionOptions{ExcludeTxnFromChangeStreams: exclude}
6801+
})
6802+
if err != nil {
6803+
t.Fatal(err)
6804+
}
6805+
_ = tx.BufferWrite(ms)
6806+
if _, err := tx.Commit(ctx); err != nil {
6807+
t.Fatal(err)
6808+
}
6809+
6810+
requests := drainRequestsFromServer(server.TestSpanner)
6811+
beginRequests := requestsOfType(requests, reflect.TypeOf(&sppb.BeginTransactionRequest{}))
6812+
if g, w := len(beginRequests), 1; g != w {
6813+
t.Fatalf("num BeginTransaction requests mismatch\n Got: %v\nWant: %v", g, w)
6814+
}
6815+
beginRequest := beginRequests[0].(*sppb.BeginTransactionRequest)
6816+
if g, w := beginRequest.Options.ExcludeTxnFromChangeStreams, exclude; g != w {
6817+
t.Fatalf("exclude option mismatch\n Got: %v\nWant: %v", g, w)
6818+
}
6819+
}
6820+
67856821
func TestParseServerTimingHeader(t *testing.T) {
67866822
tests := []struct {
67876823
name string

spanner/transaction.go

Lines changed: 48 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1327,6 +1327,16 @@ type ReadWriteTransaction struct {
13271327
wb []*Mutation
13281328
// isLongRunningTransaction indicates whether the transaction is long-running or not.
13291329
isLongRunningTransaction bool
1330+
// getTransactionOptionsCallback is a callback function that is called right before the
1331+
// transaction is actually started (either inlined or with an explicit BeginTransaction RPC).
1332+
// This callback can be used for transactions that do not yet know all the options at the
1333+
// moment that the start of the transaction is registered. This allows the following type of
1334+
// scripts to be executed by the database/sql driver:
1335+
// BEGIN TRANSACTION
1336+
// SET ISOLATION_LEVEL='repeatable_read' -- This sets an additional option after the transaction was started.
1337+
// UPDATE my_table SET my_col=1 WHERE id=1 -- This triggers the actual creation of the transaction.
1338+
// COMMIT
1339+
getTransactionOptionsCallback func() TransactionOptions
13301340
}
13311341

13321342
func (t *ReadWriteTransaction) isDefaultInlinedBegin() bool {
@@ -1563,11 +1573,16 @@ func (t *ReadWriteTransaction) acquire(ctx context.Context) (*sessionHandle, *sp
15631573
// is accepted.
15641574
t.state = txInit
15651575
sh := t.sh
1576+
if t.getTransactionOptionsCallback != nil {
1577+
t.txOpts = t.txOpts.merge(t.getTransactionOptionsCallback())
1578+
}
15661579
ts := &sppb.TransactionSelector{
15671580
Selector: &sppb.TransactionSelector_Begin{
15681581
Begin: &sppb.TransactionOptions{
15691582
Mode: &sppb.TransactionOptions_ReadWrite_{
1570-
ReadWrite: &sppb.TransactionOptions_ReadWrite{},
1583+
ReadWrite: &sppb.TransactionOptions_ReadWrite{
1584+
ReadLockMode: t.txOpts.ReadLockMode,
1585+
},
15711586
},
15721587
ExcludeTxnFromChangeStreams: t.txOpts.ExcludeTxnFromChangeStreams,
15731588
IsolationLevel: t.txOpts.IsolationLevel,
@@ -1622,6 +1637,9 @@ func (t *ReadWriteTransaction) getTransactionSelector() *sppb.TransactionSelecto
16221637
},
16231638
}
16241639
}
1640+
if t.getTransactionOptionsCallback != nil {
1641+
t.txOpts = t.txOpts.merge(t.getTransactionOptionsCallback())
1642+
}
16251643
mode := &sppb.TransactionOptions_ReadWrite_{
16261644
ReadWrite: &sppb.TransactionOptions_ReadWrite{
16271645
ReadLockMode: t.txOpts.ReadLockMode,
@@ -1786,6 +1804,9 @@ func (t *ReadWriteTransaction) begin(ctx context.Context, mutation *sppb.Mutatio
17861804
if sh != nil {
17871805
sh.updateLastUseTime()
17881806
}
1807+
if t.getTransactionOptionsCallback != nil {
1808+
t.txOpts = t.txOpts.merge(t.getTransactionOptionsCallback())
1809+
}
17891810
tx, precommitToken, err = beginTransaction(contextWithOutgoingMetadata(ctx, sh.getMetadata(), t.disableRouteToLeader), transactionBeginOptions{
17901811
multiplexEnabled: t.sp.isMultiplexedSessionForRWEnabled(),
17911812
sessionID: sh.getID(),
@@ -2021,8 +2042,7 @@ type ReadWriteStmtBasedTransaction struct {
20212042
// ReadWriteTransaction contains methods for performing transactional reads.
20222043
ReadWriteTransaction
20232044

2024-
client *Client
2025-
options TransactionOptions
2045+
client *Client
20262046
}
20272047

20282048
func (t *ReadWriteStmtBasedTransaction) isDefaultInlinedBegin() bool {
@@ -2060,10 +2080,29 @@ func NewReadWriteStmtBasedTransaction(ctx context.Context, c *Client) (*ReadWrit
20602080
// NewReadWriteStmtBasedTransactionWithOptions is a configurable version of
20612081
// NewReadWriteStmtBasedTransaction.
20622082
func NewReadWriteStmtBasedTransactionWithOptions(ctx context.Context, c *Client, options TransactionOptions) (*ReadWriteStmtBasedTransaction, error) {
2063-
return newReadWriteStmtBasedTransactionWithSessionHandle(ctx, c, options, nil, nil)
2083+
return newReadWriteStmtBasedTransactionWithSessionHandle(ctx, c, options, nil, nil, nil)
2084+
}
2085+
2086+
// NewReadWriteStmtBasedTransactionWithCallbackForOptions starts a read-write
2087+
// transaction with a callback that gives the actual transaction options.
2088+
// Commit() or Rollback() must be called to end a transaction. If Commit() or
2089+
// Rollback() is not called, the session that is used by the transaction will
2090+
// not be returned to the pool and cause a session leak.
2091+
//
2092+
// ResetForRetry resets the transaction before a retry attempt. This function
2093+
// returns a new transaction that should be used for the retry attempt. The
2094+
// transaction that is returned by this function is assigned a higher priority
2095+
// than the previous transaction, making it less probable to be aborted by
2096+
// Spanner again during the retry.
2097+
//
2098+
// NewReadWriteStmtBasedTransactionWithCallbackForOptions is the same as
2099+
// NewReadWriteStmtBasedTransactionWithOptions, but allows the caller to wait
2100+
// with setting the actual transaction options until a later moment.
2101+
func NewReadWriteStmtBasedTransactionWithCallbackForOptions(ctx context.Context, c *Client, opts TransactionOptions, callback func() TransactionOptions) (*ReadWriteStmtBasedTransaction, error) {
2102+
return newReadWriteStmtBasedTransactionWithSessionHandle(ctx, c, opts, nil, nil, callback)
20642103
}
20652104

2066-
func newReadWriteStmtBasedTransactionWithSessionHandle(ctx context.Context, c *Client, options TransactionOptions, sh *sessionHandle, previousTransactionID transactionID) (*ReadWriteStmtBasedTransaction, error) {
2105+
func newReadWriteStmtBasedTransactionWithSessionHandle(ctx context.Context, c *Client, options TransactionOptions, sh *sessionHandle, previousTransactionID transactionID, callback func() TransactionOptions) (*ReadWriteStmtBasedTransaction, error) {
20672106
var (
20682107
err error
20692108
t *ReadWriteStmtBasedTransaction
@@ -2081,7 +2120,8 @@ func newReadWriteStmtBasedTransactionWithSessionHandle(ctx context.Context, c *C
20812120
}
20822121
t = &ReadWriteStmtBasedTransaction{
20832122
ReadWriteTransaction: ReadWriteTransaction{
2084-
txReadyOrClosed: make(chan struct{}),
2123+
txReadyOrClosed: make(chan struct{}),
2124+
getTransactionOptionsCallback: callback,
20852125
},
20862126
client: c,
20872127
}
@@ -2120,7 +2160,6 @@ func newReadWriteStmtBasedTransactionWithSessionHandle(ctx context.Context, c *C
21202160
return err
21212161
}
21222162

2123-
t.options = options
21242163
t.txOpts = c.txo.merge(options)
21252164
t.ct = c.ct
21262165
t.otConfig = c.otConfig
@@ -2202,8 +2241,8 @@ func (t *ReadWriteStmtBasedTransaction) ResetForRetry(ctx context.Context) (*Rea
22022241
// Create a new transaction that re-uses the current session if it is available.
22032242
// It should always use an explicit BeginTransaction RPC to ensure that the first
22042243
// statement is included in the transaction.
2205-
t.options.BeginTransactionOption = ExplicitBeginTransaction
2206-
return newReadWriteStmtBasedTransactionWithSessionHandle(ctx, t.client, t.options, t.sh, previousTransactionID)
2244+
t.txOpts.BeginTransactionOption = ExplicitBeginTransaction
2245+
return newReadWriteStmtBasedTransactionWithSessionHandle(ctx, t.client, t.txOpts, t.sh, previousTransactionID, t.getTransactionOptionsCallback)
22072246
}
22082247

22092248
// writeOnlyTransaction provides the most efficient way of doing write-only

spanner/transaction_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1588,7 +1588,7 @@ func testReadWriteStmtBasedTransaction(t *testing.T, beginTransactionOption Begi
15881588
if err != nil {
15891589
return 0, attempts, fmt.Errorf("failed to begin a transaction: %v", err)
15901590
}
1591-
if g, w := tx.options.TransactionTag, "test"; g != w {
1591+
if g, w := tx.txOpts.TransactionTag, "test"; g != w {
15921592
t.Errorf("transaction tag mismatch\n Got: %v\nWant: %v", g, w)
15931593
}
15941594
rowCount, err = f(tx)

0 commit comments

Comments
 (0)