From a6fcb9488f643eaa687aa196e28380efecec0133 Mon Sep 17 00:00:00 2001 From: Scott Hart Date: Tue, 12 Aug 2025 11:06:28 -0400 Subject: [PATCH 1/2] fix(spanner): ExcludeTransactionFromChangeStreamsOption not always propagated --- .../cloud/spanner/internal/connection_impl.cc | 6 +++ .../spanner/internal/connection_impl_test.cc | 43 +++++++++++++++++++ 2 files changed, 49 insertions(+) diff --git a/google/cloud/spanner/internal/connection_impl.cc b/google/cloud/spanner/internal/connection_impl.cc index ed9c4f4149e19..75979e2a19da0 100644 --- a/google/cloud/spanner/internal/connection_impl.cc +++ b/google/cloud/spanner/internal/connection_impl.cc @@ -580,6 +580,12 @@ StatusOr ConnectionImpl::BeginTransaction( auto stub = GetStubBasedOnSessionMode(*session, ctx); auto const& current = internal::CurrentOptions(); + + if (current.has() && + current.get()) { + begin.mutable_options()->set_exclude_txn_from_change_streams(true); + } + auto response = RetryLoop( RetryPolicyPrototype(current)->clone(), BackoffPolicyPrototype(current)->clone(), Idempotency::kIdempotent, diff --git a/google/cloud/spanner/internal/connection_impl_test.cc b/google/cloud/spanner/internal/connection_impl_test.cc index 84092971212f8..3ace933218263 100644 --- a/google/cloud/spanner/internal/connection_impl_test.cc +++ b/google/cloud/spanner/internal/connection_impl_test.cc @@ -3233,6 +3233,49 @@ TEST(ConnectionImplTest, CommitSuccessExcludeFromChangeStreams) { Eq(spanner::MakeTimestamp(absl::FromUnixSeconds(123)).value())))); } +// Reproduces issue b/346858290 where `exclude_txn_from_change_streams` is not +// propagated from the OptionsSpan to the BeginTransaction request. +TEST(ConnectionImplTest, CommitSuccessExcludeFromChangeStreamsExplicitTxn) { + auto mock = std::make_shared(); + auto db = spanner::Database("placeholder_project", "placeholder_instance", + "placeholder_database_id"); + EXPECT_CALL(*mock, BatchCreateSessions(_, _, HasDatabase(db))) + .WillOnce(Return(MakeSessionsResponse({"test-session-name"}))); + EXPECT_CALL(*mock, BeginTransaction) + .WillOnce( + [](grpc::ClientContext&, Options const&, + google::spanner::v1::BeginTransactionRequest const& request) { + EXPECT_TRUE(request.options().has_read_write()); + EXPECT_TRUE(request.options().exclude_txn_from_change_streams()); + return MakeTestTransaction(); + }); + EXPECT_CALL(*mock, Commit(_, _, HasSession("test-session-name"))) + .WillOnce(Return(MakeCommitResponse( + spanner::MakeTimestamp(std::chrono::system_clock::from_time_t(123)) + .value()))); + EXPECT_CALL(*mock, + AsyncDeleteSession(_, _, _, HasSessionName("test-session-name"))) + .WillOnce(Return(make_ready_future(Status{}))); + + auto conn = MakeConnectionImpl(db, mock); + internal::OptionsSpan span( + MakeLimitedTimeOptions() + .set(true)); + + // Introduce additional scope here to ensure that when txn is destroyed + // the session_pool contained by the Connection is still present, such that, + // the session associated with the transaction can be returned to the pool. + { + auto txn = spanner::Transaction(spanner::Transaction::ReadWriteOptions{}); + auto commit = conn->Commit({txn, {}}); + EXPECT_THAT( + commit, + IsOkAndHolds(Field( + &spanner::CommitResult::commit_timestamp, + Eq(spanner::MakeTimestamp(absl::FromUnixSeconds(123)).value())))); + } +} + TEST(ConnectionImplTest, CommitSuccessWithMaxCommitDelay) { auto mock = std::make_shared(); auto db = spanner::Database("placeholder_project", "placeholder_instance", From 18128709901ff7bd8501a38d60745d9582ad2cbc Mon Sep 17 00:00:00 2001 From: Scott Hart Date: Thu, 14 Aug 2025 17:34:42 -0400 Subject: [PATCH 2/2] adjust test and revert to original fix --- .../cloud/spanner/internal/connection_impl.cc | 3 +- .../spanner/internal/connection_impl_test.cc | 28 ++++++++----------- 2 files changed, 13 insertions(+), 18 deletions(-) diff --git a/google/cloud/spanner/internal/connection_impl.cc b/google/cloud/spanner/internal/connection_impl.cc index 75979e2a19da0..39e7c7daf57e3 100644 --- a/google/cloud/spanner/internal/connection_impl.cc +++ b/google/cloud/spanner/internal/connection_impl.cc @@ -581,8 +581,7 @@ StatusOr ConnectionImpl::BeginTransaction( auto stub = GetStubBasedOnSessionMode(*session, ctx); auto const& current = internal::CurrentOptions(); - if (current.has() && - current.get()) { + if (current.get()) { begin.mutable_options()->set_exclude_txn_from_change_streams(true); } diff --git a/google/cloud/spanner/internal/connection_impl_test.cc b/google/cloud/spanner/internal/connection_impl_test.cc index 3ace933218263..9a08aca6e7fa8 100644 --- a/google/cloud/spanner/internal/connection_impl_test.cc +++ b/google/cloud/spanner/internal/connection_impl_test.cc @@ -3253,27 +3253,23 @@ TEST(ConnectionImplTest, CommitSuccessExcludeFromChangeStreamsExplicitTxn) { .WillOnce(Return(MakeCommitResponse( spanner::MakeTimestamp(std::chrono::system_clock::from_time_t(123)) .value()))); - EXPECT_CALL(*mock, - AsyncDeleteSession(_, _, _, HasSessionName("test-session-name"))) - .WillOnce(Return(make_ready_future(Status{}))); + // Transaction is created before the usual Connection and Client creation. + // This is the crux of reproducing this issue. + auto txn = spanner::Transaction(spanner::Transaction::ReadWriteOptions{}); + + // Connection and OptionsSpan creation mimics what occurs prior to calling + // ConnectionImpl::Commit in Client::Commit(Transaction, Mutations, Options). auto conn = MakeConnectionImpl(db, mock); internal::OptionsSpan span( MakeLimitedTimeOptions() .set(true)); - - // Introduce additional scope here to ensure that when txn is destroyed - // the session_pool contained by the Connection is still present, such that, - // the session associated with the transaction can be returned to the pool. - { - auto txn = spanner::Transaction(spanner::Transaction::ReadWriteOptions{}); - auto commit = conn->Commit({txn, {}}); - EXPECT_THAT( - commit, - IsOkAndHolds(Field( - &spanner::CommitResult::commit_timestamp, - Eq(spanner::MakeTimestamp(absl::FromUnixSeconds(123)).value())))); - } + auto commit = conn->Commit({txn, {}}); + EXPECT_THAT( + commit, + IsOkAndHolds(Field( + &spanner::CommitResult::commit_timestamp, + Eq(spanner::MakeTimestamp(absl::FromUnixSeconds(123)).value())))); } TEST(ConnectionImplTest, CommitSuccessWithMaxCommitDelay) {