Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/storage/replica_application_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func clearTrivialReplicatedEvalResultFields(r *storagepb.ReplicatedEvalResult) {
// engine but before its side-effects have been applied to the Replica's
// in-memory state. This method gives the command an opportunity to interact
// with testing knobs and to set up its local result if it was proposed
// locally. This is performed prior to handling the command's
// locally. This is performed prior to handling the command's
// ReplicatedEvalResult because the process of handling the replicated eval
// result will zero-out the struct to ensure that is has properly performed all
// of the implied side-effects.
Expand Down
39 changes: 27 additions & 12 deletions pkg/storage/replica_application_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -884,19 +884,34 @@ func (sm *replicaStateMachine) ApplySideEffects(
}

// Mark the command as applied and return it as an apply.AppliedCommand.
// NB: Commands which were reproposed at a higher MaxLeaseIndex will not be
// considered local at this point as their proposal will have been detached
// in prepareLocalResult().
if cmd.IsLocal() {
if !cmd.Rejected() {
if cmd.raftCmd.MaxLeaseIndex != cmd.proposal.command.MaxLeaseIndex {
log.Fatalf(ctx, "finishing proposal with outstanding reproposal at a higher max lease index")
}
if cmd.proposal.applied {
// If the command already applied then we shouldn't be "finishing" its
// application again because it should only be able to apply successfully
// once. We expect that when any reproposal for the same command attempts
// to apply it will be rejected by the below raft lease sequence or lease
// index check in checkForcedErr.
log.Fatalf(ctx, "command already applied: %+v; unexpected successful result", cmd)
}
rejected := cmd.Rejected()
higherReproposalsExist := cmd.raftCmd.MaxLeaseIndex != cmd.proposal.command.MaxLeaseIndex
if !rejected && higherReproposalsExist {
log.Fatalf(ctx, "finishing proposal with outstanding reproposal at a higher max lease index")
}
if !rejected && cmd.proposal.applied {
// If the command already applied then we shouldn't be "finishing" its
// application again because it should only be able to apply successfully
// once. We expect that when any reproposal for the same command attempts
// to apply it will be rejected by the below raft lease sequence or lease
// index check in checkForcedErr.
log.Fatalf(ctx, "command already applied: %+v; unexpected successful result", cmd)
}
// If any reproposals at a higher MaxLeaseIndex exist we know that they will
// never successfully apply, remove them from the map to avoid future
// reproposals. If there is no command referencing this proposal at a higher
// MaxLeaseIndex then it will already have been removed (see
// shouldRemove in replicaDecoder.retrieveLocalProposals()). It is possible
// that a later command in this batch referred to this proposal but it must
// have failed because it carried the same MaxLeaseIndex.
if higherReproposalsExist {
sm.r.mu.Lock()
delete(sm.r.mu.proposals, cmd.idKey)
sm.r.mu.Unlock()
}
cmd.proposal.applied = true
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/storage/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -780,7 +780,9 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
}
applicationElapsed := timeutil.Since(applicationStart).Nanoseconds()
r.store.metrics.RaftApplyCommittedLatency.RecordValue(applicationElapsed)

if r.store.TestingKnobs().EnableUnconditionalRefreshesInRaftReady {
refreshReason = reasonNewLeaderOrConfigChange
}
if refreshReason != noReason {
r.mu.Lock()
r.refreshProposalsLocked(0, refreshReason)
Expand Down
71 changes: 40 additions & 31 deletions pkg/storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11568,16 +11568,35 @@ func TestSplitSnapshotWarningStr(t *testing.T) {
)
}

// TestHighestMaxLeaseIndexReproposalFinishesCommand exercises a case where a
// command is reproposed twice at different MaxLeaseIndex values to ultimately
// fail with an error which cannot be reproposed (say due to a lease transfer
// or change to the gc threshold). This test works to exercise the invariant
// that when a proposal has been reproposed at different MaxLeaseIndex value
// the client is ultimately acknowledged with an error from a reproposal with
// the largest index. The test verfies this condition by asserting that the
// TestProposalNotAcknowledgedOrReproposedAfterApplication exercises a case
// where a command is reproposed twice at different MaxLeaseIndex values to
// ultimately fail with an error which cannot be reproposed (say due to a lease
// transfer or change to the gc threshold). This test works to exercise the
// invariant that when a proposal has been reproposed at different MaxLeaseIndex
// values are not additionally reproposed or acknowledged after applying
// locally. The test verfies this condition by asserting that the
// span used to trace the execution of the proposal is not used after the
// proposal has been finished.
func TestHighestMaxLeaseIndexReproposalFinishesCommand(t *testing.T) {
// proposal has been finished as it would be if the proposal were reproposed
// after applying locally.
//
// The test does the following things:
//
// * Propose cmd at an initial MaxLeaseIndex.
// * Refresh that cmd immediately.
// * Fail the initial command with an injected error which will lead to a
// reproposal at a higher MaxLeaseIndex.
// * Simultaneously update the lease sequence number on the replica so all
// future commands will fail with NotLeaseHolderError.
// * Enable unconditional refreshes of commands after a raft ready so that
// higher MaxLeaseIndex commands are refreshed.
//
// This order of events ensures that there will be a committed command which
// experiences the lease mismatch error but does not carry the highest
// MaxLeaseIndex for the proposal. The test attempts to verify that once a
// proposal has been acknowledged it will not be reproposed or acknowledged
// again by asserting that the proposal's context is not reused after it is
// finished by the waiting client.
func TestProposalNotAcknowledgedOrReproposedAfterApplication(t *testing.T) {
defer leaktest.AfterTest(t)()

// Set the trace infrastructure to log if a span is used after being finished.
Expand Down Expand Up @@ -11608,12 +11627,12 @@ func TestHighestMaxLeaseIndexReproposalFinishesCommand(t *testing.T) {
// In the TestingProposalFilter we populater cmdID with the id of the proposal
// which corresponds to txnID.
var cmdID storagebase.CmdIDKey
// After we evalAndPropose the command we populate prop with the ProposalData
// value to enable reproposing the same command more than once.
var prop *ProposalData
// seen is used to detect the first application of our proposal.
var seen bool
cfg.TestingKnobs = StoreTestingKnobs{
// Constant reproposals are the worst case which this test is trying to
// examine.
EnableUnconditionalRefreshesInRaftReady: true,
// Set the TestingProposalFilter in order to know the CmdIDKey for our
// request by detecting its txnID.
TestingProposalFilter: func(args storagebase.ProposalFilterArgs) *roachpb.Error {
Expand All @@ -11629,30 +11648,21 @@ func TestHighestMaxLeaseIndexReproposalFinishesCommand(t *testing.T) {
return 0, nil
}
seen = true
// Repropose on a separate location to not mess with the
// goldenProtosBelowRaft checks.
reproposed := make(chan struct{})
go func() {
if _, pErr := tc.repl.propose(prop.ctx, prop); pErr != nil {
panic(pErr)
}
close(reproposed)
}()
<-reproposed
tc.repl.mu.Lock()
defer tc.repl.mu.Unlock()
// Flush the proposalBuf to ensure that the reproposal makes it into the
// Replica's proposal map.
if err := tc.repl.mu.proposalBuf.flushLocked(); err != nil {
panic(err)
}

// Increase the lease sequence so that future reproposals will fail with
// NotLeaseHolderError. This mimics the outcome of a leaseholder change
// slipping in between the application of the first proposal and the
// reproposals.
tc.repl.mu.state.Lease.Sequence++
// This return value will force another retry which will carry a yet
// higher MaxLeaseIndex and will trigger our invariant violation.
// higher MaxLeaseIndex. The first reproposal will fail and return to the
// client but the second (which hasn't been applied due to the
// MaxCommittedSizePerReady setting) will be reproposed again. This test
// ensure that it does not reuse the original proposal's context for that
// reproposal by ensuring that no event is recorded after the original
// proposal has been finished.
return int(proposalIllegalLeaseIndex),
roachpb.NewErrorf("forced error that can be reproposed at a higher index")
},
Expand All @@ -11678,8 +11688,6 @@ func TestHighestMaxLeaseIndexReproposalFinishesCommand(t *testing.T) {

// Hold the RaftLock to ensure that after evalAndPropose our proposal is in
// the proposal map. Entries are only removed from that map underneath raft.
// We want to grab the proposal so that we can shove in an extra reproposal
// while the first proposal is being applied.
tc.repl.RaftLock()
tracedCtx, cleanup := tracing.EnsureContext(ctx, cfg.AmbientCtx.Tracer, "replica send")
ch, _, _, pErr := tc.repl.evalAndPropose(tracedCtx, lease, &ba, &allSpans, endCmds{})
Expand All @@ -11693,14 +11701,15 @@ func TestHighestMaxLeaseIndexReproposalFinishesCommand(t *testing.T) {
errCh <- res.Err
}()

// While still holding the raftMu, repropose the initial proposal so we know
// that there will be two instances
func() {
tc.repl.mu.Lock()
defer tc.repl.mu.Unlock()
if err := tc.repl.mu.proposalBuf.flushLocked(); err != nil {
t.Fatal(err)
}
tc.repl.refreshProposalsLocked(0, reasonNewLeaderOrConfigChange)
prop = tc.repl.mu.proposals[cmdID]
}()
tc.repl.RaftUnlock()

Expand Down
3 changes: 3 additions & 0 deletions pkg/storage/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,9 @@ type StoreTestingKnobs struct {
// TraceAllRaftEvents enables raft event tracing even when the current
// vmodule would not have enabled it.
TraceAllRaftEvents bool
// EnableUnconditionalRefreshesInRaftReady will always set the refresh reason
// in handleRaftReady to refreshReasonNewLeaderOrConfigChange.
EnableUnconditionalRefreshesInRaftReady bool

// ReceiveSnapshot is run after receiving a snapshot header but before
// acquiring snapshot quota or doing shouldAcceptSnapshotData checks. If an
Expand Down