Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,14 @@ func (r *Replica) handleRaftReady(
// non-sensitive cue as to what happened.
func (r *Replica) handleRaftReadyRaftMuLocked(
ctx context.Context, inSnap IncomingSnapshot,
) (_ handleRaftReadyStats, _ string, foo error) {
) (handleRaftReadyStats, string, error) {
// handleRaftReadyRaftMuLocked is not prepared to handle context cancellation,
// so assert that it's given a non-cancellable context.
if ctx.Done() != nil {
return handleRaftReadyStats{}, "", errors.AssertionFailedf(
"handleRaftReadyRaftMuLocked cannot be called with a cancellable context")
}

var stats handleRaftReadyStats
if inSnap.Desc != nil {
stats.snap.offered = true
Expand Down
20 changes: 12 additions & 8 deletions pkg/kv/kvserver/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"sync"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
Expand Down Expand Up @@ -160,9 +161,10 @@ type raftScheduleState struct {
}

type raftScheduler struct {
processor raftProcessor
latency *metric.Histogram
numWorkers int
ambientContext log.AmbientContext
processor raftProcessor
latency *metric.Histogram
numWorkers int

mu struct {
syncutil.Mutex
Expand All @@ -176,19 +178,21 @@ type raftScheduler struct {
}

func newRaftScheduler(
metrics *StoreMetrics, processor raftProcessor, numWorkers int,
ambient log.AmbientContext, metrics *StoreMetrics, processor raftProcessor, numWorkers int,
) *raftScheduler {
s := &raftScheduler{
processor: processor,
latency: metrics.RaftSchedulerLatency,
numWorkers: numWorkers,
ambientContext: ambient,
processor: processor,
latency: metrics.RaftSchedulerLatency,
numWorkers: numWorkers,
}
s.mu.cond = sync.NewCond(&s.mu.Mutex)
s.mu.state = make(map[roachpb.RangeID]raftScheduleState)
return s
}

func (s *raftScheduler) Start(ctx context.Context, stopper *stop.Stopper) {
func (s *raftScheduler) Start(stopper *stop.Stopper) {
ctx := s.ambientContext.AnnotateCtx(context.Background())
waitQuiesce := func(context.Context) {
<-stopper.ShouldQuiesce()
s.mu.Lock()
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,11 +232,11 @@ func TestSchedulerLoop(t *testing.T) {

m := newStoreMetrics(metric.TestSampleInterval)
p := newTestProcessor()
s := newRaftScheduler(m, p, 1)
s := newRaftScheduler(log.AmbientContext{}, m, p, 1)
stopper := stop.NewStopper()
ctx := context.Background()
defer stopper.Stop(ctx)
s.Start(ctx, stopper)
s.Start(stopper)
s.EnqueueRaftTicks(1, 2, 3)

testutils.SucceedsSoon(t, func() error {
Expand All @@ -258,11 +258,11 @@ func TestSchedulerBuffering(t *testing.T) {

m := newStoreMetrics(metric.TestSampleInterval)
p := newTestProcessor()
s := newRaftScheduler(m, p, 1)
s := newRaftScheduler(log.AmbientContext{}, m, p, 1)
stopper := stop.NewStopper()
ctx := context.Background()
defer stopper.Stop(ctx)
s.Start(ctx, stopper)
s.Start(stopper)

testCases := []struct {
flag raftScheduleFlags
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1114,7 +1114,7 @@ func NewStore(
s.replRankings = newReplicaRankings()

s.draining.Store(false)
s.scheduler = newRaftScheduler(s.metrics, s, storeSchedulerConcurrency)
s.scheduler = newRaftScheduler(cfg.AmbientCtx, s.metrics, s, storeSchedulerConcurrency)

s.raftEntryCache = raftentry.NewCache(cfg.RaftEntryCacheSize)
s.metrics.registry.AddMetricStruct(s.raftEntryCache.Metrics())
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/store_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,7 @@ func (s *Store) processRaft(ctx context.Context) {
return
}

s.scheduler.Start(ctx, s.stopper)
s.scheduler.Start(s.stopper)
// Wait for the scheduler worker goroutines to finish.
if err := s.stopper.RunAsyncTask(ctx, "sched-wait", s.scheduler.Wait); err != nil {
s.scheduler.Wait(ctx)
Expand Down