Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 30 additions & 21 deletions pkg/storage/replica_application.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,26 +305,30 @@ func (r *Replica) stageRaftCommand(
// TODO(ajwerner): coalesce the clock update per batch.
r.store.Clock().Update(ts)

{
err := r.applyRaftCommandToBatch(cmd.ctx, cmd, replicaState, batch, writeAppliedState)
// If the command was using the deprecated version of the MVCCStats proto,
// migrate it to the new version and clear out the field.
if deprecatedDelta := cmd.replicatedResult().DeprecatedDelta; deprecatedDelta != nil {
if cmd.replicatedResult().Delta != (enginepb.MVCCStatsDelta{}) {
log.Fatalf(ctx, "stats delta not empty but deprecated delta provided: %+v", cmd)
}
cmd.replicatedResult().Delta = deprecatedDelta.ToStatsDelta()
cmd.replicatedResult().DeprecatedDelta = nil
}

// Apply the Raft command to the batch's accumulated state. This may also
// have the effect of mutating cmd.replicatedResult().
err := r.applyRaftCommandToBatch(cmd.ctx, cmd, replicaState, batch, writeAppliedState)
if err != nil {
// applyRaftCommandToBatch returned an error, which usually indicates
// either a serious logic bug in CockroachDB or a disk
// corruption/out-of-space issue. Make sure that these fail with
// descriptive message so that we can differentiate the root causes.
if err != nil {
log.Errorf(ctx, "unable to update the state machine: %+v", err)
// Report the fatal error separately and only with the error, as that
// triggers an optimization for which we directly report the error to
// sentry (which in turn allows sentry to distinguish different error
// types).
log.Fatal(ctx, err)
}
}

if deprecatedDelta := cmd.replicatedResult().DeprecatedDelta; deprecatedDelta != nil {
cmd.replicatedResult().Delta = deprecatedDelta.ToStatsDelta()
cmd.replicatedResult().DeprecatedDelta = nil
log.Errorf(ctx, "unable to update the state machine: %+v", err)
// Report the fatal error separately and only with the error, as that
// triggers an optimization for which we directly report the error to
// sentry (which in turn allows sentry to distinguish different error
// types).
log.Fatal(ctx, err)
}

// AddSSTable ingestions run before the actual batch gets written to the
Expand Down Expand Up @@ -559,7 +563,6 @@ func (r *Replica) applyRaftCommandToBatch(
if cmd.replicatedResult().Split != nil {
replicaState.Stats.ContainsEstimates = false
}
ms := replicaState.Stats

if cmd.e.Index != replicaState.RaftAppliedIndex+1 {
// If we have an out of order index, there's corruption. No sense in
Expand Down Expand Up @@ -597,6 +600,9 @@ func (r *Replica) applyRaftCommandToBatch(
// The Raft command wants us to begin using the RangeAppliedState key
// and we haven't performed the migration yet. Delete the old keys
// that this new key is replacing.
//
// NB: entering this branch indicates that the cmd was considered
// non-trivial and therefore placed in its own batch.
err := r.raftMu.stateLoader.MigrateToRangeAppliedStateKey(ctx, writer, &deltaStats)
if err != nil {
return errors.Wrap(err, "unable to migrate to range applied state")
Expand All @@ -610,15 +616,15 @@ func (r *Replica) applyRaftCommandToBatch(
// Note that calling ms.Add will never result in ms.LastUpdateNanos
// decreasing (and thus LastUpdateNanos tracks the maximum LastUpdateNanos
// across all deltaStats).
ms := *replicaState.Stats
ms.Add(deltaStats)

// Set the range applied state, which includes the last applied raft and
// lease index along with the mvcc stats, all in one key.
if err := r.raftMu.stateLoader.SetRangeAppliedState(ctx, writer,
cmd.e.Index, cmd.leaseIndex, ms); err != nil {
cmd.e.Index, cmd.leaseIndex, &ms); err != nil {
return errors.Wrap(err, "unable to set range applied state")
}
ms.Subtract(deltaStats)
} else {
// Advance the last applied index. We use a blind write in order to avoid
// reading the previous applied index keys on every write operation. This
Expand All @@ -634,12 +640,15 @@ func (r *Replica) applyRaftCommandToBatch(
// Note that calling ms.Add will never result in ms.LastUpdateNanos
// decreasing (and thus LastUpdateNanos tracks the maximum LastUpdateNanos
// across all deltaStats).
ms := *replicaState.Stats
ms.Add(deltaStats)
if err := r.raftMu.stateLoader.SetMVCCStats(ctx, writer, ms); err != nil {
if err := r.raftMu.stateLoader.SetMVCCStats(ctx, writer, &ms); err != nil {
return errors.Wrap(err, "unable to update MVCCStats")
}
ms.Subtract(deltaStats)
}
// We may have modified the effect on the range's stats that the application
// of the command will have. Update the command's stats delta to reflect this.
cmd.replicatedResult().Delta = deltaStats.ToStatsDelta()

// Close the Distinct() batch here now that we're done writing to it.
writer.Close()
Expand Down Expand Up @@ -676,6 +685,7 @@ func (r *Replica) applyRaftCommandToBatch(

start := timeutil.Now()

// TODO(ajwerner): This assertion no longer makes much sense.
var assertHS *raftpb.HardState
if util.RaceEnabled && cmd.replicatedResult().Split != nil {
rsl := stateloader.Make(cmd.replicatedResult().Split.RightDesc.RangeID)
Expand Down Expand Up @@ -707,7 +717,6 @@ func (r *Replica) applyRaftCommandToBatch(
// something more appropriate.
elapsed := timeutil.Since(start)
r.store.metrics.RaftCommandCommitLatency.RecordValue(elapsed.Nanoseconds())
cmd.replicatedResult().Delta = deltaStats.ToStatsDelta()
return nil
}

Expand Down
9 changes: 7 additions & 2 deletions pkg/storage/stateloader/stateloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,8 @@ func (rsl StateLoader) SetRangeAppliedState(
return engine.MVCCPutProto(ctx, eng, ms, rsl.RangeAppliedStateKey(), hlc.Timestamp{}, nil, &as)
}

// MigrateToRangeAppliedStateKey TODO
// MigrateToRangeAppliedStateKey deletes the keys that were replaced by the
// RangeAppliedState key.
func (rsl StateLoader) MigrateToRangeAppliedStateKey(
ctx context.Context, eng engine.ReadWriter, ms *enginepb.MVCCStats,
) error {
Expand Down Expand Up @@ -393,7 +394,11 @@ func (rsl StateLoader) CalcAppliedIndexSysBytes(appliedIndex, leaseAppliedIndex
func (rsl StateLoader) writeLegacyMVCCStatsInternal(
ctx context.Context, eng engine.ReadWriter, newMS *enginepb.MVCCStats,
) error {
return engine.MVCCPutProto(ctx, eng, nil, rsl.RangeStatsLegacyKey(), hlc.Timestamp{}, nil, newMS)
// NB: newMS is copied to prevent conditional calls to this method from
// causing the stats argument to escape. This is legacy code which does
// not need to be optimized for performance.
newMSCopy := *newMS
return engine.MVCCPutProto(ctx, eng, nil, rsl.RangeStatsLegacyKey(), hlc.Timestamp{}, nil, &newMSCopy)
}

// SetLegacyMVCCStats overwrites the legacy MVCC stats key.
Expand Down