diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index a0fb7e8ce8c8..0dbc39848011 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -515,7 +515,14 @@ func (r *Replica) handleRaftReady( // non-sensitive cue as to what happened. func (r *Replica) handleRaftReadyRaftMuLocked( ctx context.Context, inSnap IncomingSnapshot, -) (_ handleRaftReadyStats, _ string, foo error) { +) (handleRaftReadyStats, string, error) { + // handleRaftReadyRaftMuLocked is not prepared to handle context cancellation, + // so assert that it's given a non-cancellable context. + if ctx.Done() != nil { + return handleRaftReadyStats{}, "", errors.AssertionFailedf( + "handleRaftReadyRaftMuLocked cannot be called with a cancellable context") + } + var stats handleRaftReadyStats if inSnap.Desc != nil { stats.snap.offered = true diff --git a/pkg/kv/kvserver/scheduler.go b/pkg/kv/kvserver/scheduler.go index 761b18f0a9d6..c5e602227465 100644 --- a/pkg/kv/kvserver/scheduler.go +++ b/pkg/kv/kvserver/scheduler.go @@ -17,6 +17,7 @@ import ( "sync" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" @@ -160,9 +161,10 @@ type raftScheduleState struct { } type raftScheduler struct { - processor raftProcessor - latency *metric.Histogram - numWorkers int + ambientContext log.AmbientContext + processor raftProcessor + latency *metric.Histogram + numWorkers int mu struct { syncutil.Mutex @@ -176,19 +178,21 @@ type raftScheduler struct { } func newRaftScheduler( - metrics *StoreMetrics, processor raftProcessor, numWorkers int, + ambient log.AmbientContext, metrics *StoreMetrics, processor raftProcessor, numWorkers int, ) *raftScheduler { s := &raftScheduler{ - processor: processor, - latency: metrics.RaftSchedulerLatency, - numWorkers: numWorkers, + ambientContext: ambient, + processor: processor, + latency: metrics.RaftSchedulerLatency, + numWorkers: numWorkers, } s.mu.cond = sync.NewCond(&s.mu.Mutex) s.mu.state = make(map[roachpb.RangeID]raftScheduleState) return s } -func (s *raftScheduler) Start(ctx context.Context, stopper *stop.Stopper) { +func (s *raftScheduler) Start(stopper *stop.Stopper) { + ctx := s.ambientContext.AnnotateCtx(context.Background()) waitQuiesce := func(context.Context) { <-stopper.ShouldQuiesce() s.mu.Lock() diff --git a/pkg/kv/kvserver/scheduler_test.go b/pkg/kv/kvserver/scheduler_test.go index 767e0c247d72..c3b4fc3bc673 100644 --- a/pkg/kv/kvserver/scheduler_test.go +++ b/pkg/kv/kvserver/scheduler_test.go @@ -232,11 +232,11 @@ func TestSchedulerLoop(t *testing.T) { m := newStoreMetrics(metric.TestSampleInterval) p := newTestProcessor() - s := newRaftScheduler(m, p, 1) + s := newRaftScheduler(log.AmbientContext{}, m, p, 1) stopper := stop.NewStopper() ctx := context.Background() defer stopper.Stop(ctx) - s.Start(ctx, stopper) + s.Start(stopper) s.EnqueueRaftTicks(1, 2, 3) testutils.SucceedsSoon(t, func() error { @@ -258,11 +258,11 @@ func TestSchedulerBuffering(t *testing.T) { m := newStoreMetrics(metric.TestSampleInterval) p := newTestProcessor() - s := newRaftScheduler(m, p, 1) + s := newRaftScheduler(log.AmbientContext{}, m, p, 1) stopper := stop.NewStopper() ctx := context.Background() defer stopper.Stop(ctx) - s.Start(ctx, stopper) + s.Start(stopper) testCases := []struct { flag raftScheduleFlags diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 0de76856ab76..9c547d87c68d 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -1114,7 +1114,7 @@ func NewStore( s.replRankings = newReplicaRankings() s.draining.Store(false) - s.scheduler = newRaftScheduler(s.metrics, s, storeSchedulerConcurrency) + s.scheduler = newRaftScheduler(cfg.AmbientCtx, s.metrics, s, storeSchedulerConcurrency) s.raftEntryCache = raftentry.NewCache(cfg.RaftEntryCacheSize) s.metrics.registry.AddMetricStruct(s.raftEntryCache.Metrics()) diff --git a/pkg/kv/kvserver/store_raft.go b/pkg/kv/kvserver/store_raft.go index df7387059cc4..533c1652c1a6 100644 --- a/pkg/kv/kvserver/store_raft.go +++ b/pkg/kv/kvserver/store_raft.go @@ -572,7 +572,7 @@ func (s *Store) processRaft(ctx context.Context) { return } - s.scheduler.Start(ctx, s.stopper) + s.scheduler.Start(s.stopper) // Wait for the scheduler worker goroutines to finish. if err := s.stopper.RunAsyncTask(ctx, "sched-wait", s.scheduler.Wait); err != nil { s.scheduler.Wait(ctx)