Skip to content

Commit 7f77d1a

Browse files
committed
kv: integrate raft async storage writes
Fixes cockroachdb#17500. Waiting on github.com/cockroachdb/pebble/pull/2117. This commit integrates with the `AsyncStorageWrites` functionality that we added to Raft in github.com/etcd-io/raft/pull/8. \## Approach The commit makes the minimal changes needed to integrate with async storage writes and pull fsyncs out of the raft state machine loop. It does not make an effort to extract the non-durable portion of raft log writes or raft log application onto separate goroutine pools, as was described in cockroachdb#17500. Those changes will also be impactful, but they're non trivial and bump into a pipelining vs. batching trade-off, so they are left as future work items (TODO(nvanbenschoten): open new issues). With this change, asynchronous Raft log syncs are enabled by the new `DB.ApplyNoSyncWait` Pebble API introduced in github.com/cockroachdb/pebble/pull/2117. The `handleRaftReady` state machine loop continues to initiate Raft log writes, but it uses the Pebble API to offload waiting on durability to a separate goroutine. This separate goroutine then sends the corresponding `MsgStorageAppend`'s response messages where they need to go (locally and/or to the Raft leader) when the fsync completes. The async storage writes functionality in Raft makes this all safe. \## Benchmark Results The result of this change is reduced interference between Raft proposals. As a result, it reduces end-to-end commit latency. github.com/etcd-io/raft/pull/8 presented a collection of benchmark results captured from integrating async storage writes with rafttoy. When integrated into CockroachDB, we see similar improvements to average and tail latency. However, it doesn't provide the throughput improvements at the top end because log appends and state machine application have not yet been extracted into separate goroutine pools, which would facilitate increased opportunity for batching. TODO: add images ---- Release note (performance improvement): The Raft proposal pipeline has been optimized to reduce interference between Raft proposals. This improves average and tail write latency at high concurrency.
1 parent 063f37e commit 7f77d1a

File tree

10 files changed

+630
-356
lines changed

10 files changed

+630
-356
lines changed

pkg/kv/kvserver/logstore/BUILD.bazel

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ go_library(
88
"sideload.go",
99
"sideload_disk.go",
1010
"stateloader.go",
11+
"sync_waiter.go",
1112
],
1213
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore",
1314
visibility = ["//visibility:public"],
@@ -28,9 +29,11 @@ go_library(
2829
"//pkg/util/log",
2930
"//pkg/util/metric",
3031
"//pkg/util/protoutil",
32+
"//pkg/util/stop",
3133
"//pkg/util/timeutil",
3234
"@com_github_cockroachdb_errors//:errors",
3335
"@com_github_cockroachdb_errors//oserror",
36+
"@com_github_cockroachdb_pebble//record",
3437
"@com_github_cockroachdb_redact//:redact",
3538
"@io_etcd_go_raft_v3//:raft",
3639
"@io_etcd_go_raft_v3//raftpb",
@@ -43,6 +46,7 @@ go_test(
4346
srcs = [
4447
"logstore_bench_test.go",
4548
"sideload_test.go",
49+
"sync_waiter_test.go",
4650
],
4751
args = ["-test.timeout=295s"],
4852
embed = [":logstore"],
@@ -60,6 +64,7 @@ go_test(
6064
"//pkg/util/log",
6165
"//pkg/util/metric",
6266
"//pkg/util/protoutil",
67+
"//pkg/util/stop",
6368
"//pkg/util/tracing",
6469
"@com_github_cockroachdb_errors//:errors",
6570
"@com_github_cockroachdb_errors//oserror",

pkg/kv/kvserver/logstore/logstore.go

Lines changed: 108 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ package logstore
1313

1414
import (
1515
"context"
16+
"fmt"
1617
"sync"
1718
"time"
1819

@@ -44,25 +45,24 @@ var disableSyncRaftLog = settings.RegisterBoolSetting(
4445
envutil.EnvOrDefaultBool("COCKROACH_DISABLE_RAFT_LOG_SYNCHRONIZATION_UNSAFE", false),
4546
)
4647

47-
// Ready contains the log entries and state to be saved to stable storage. This
48-
// is a subset of raft.Ready relevant to log storage. All fields are read-only.
49-
type Ready struct {
50-
// The current state of a replica to be saved to stable storage. Empty if
51-
// there is no update.
52-
raftpb.HardState
48+
var enableNonBlockingRaftLogSync = settings.RegisterBoolSetting(
49+
settings.TenantWritable,
50+
"kv.raft_log.non_blocking_synchronization.enabled",
51+
"set to true to enable non-blocking synchronization on Raft log writes to "+
52+
"persistent storage. Setting to true does not risk data loss or data corruption "+
53+
"on server crashes, but can reduce write latency.",
54+
envutil.EnvOrDefaultBool("COCKROACH_ENABLE_RAFT_LOG_NON_BLOCKING_SYNCHRONIZATION", true),
55+
)
5356

54-
// Entries specifies entries to be saved to stable storage. Empty if there is
55-
// no update.
56-
Entries []raftpb.Entry
57+
// MsgStorageAppend is a raftpb.Message with type MsgStorageAppend.
58+
type MsgStorageAppend raftpb.Message
5759

58-
// MustSync indicates whether the HardState and Entries must be synchronously
59-
// written to disk, or if an asynchronous write is permissible.
60-
MustSync bool
61-
}
62-
63-
// MakeReady constructs a Ready struct from raft.Ready.
64-
func MakeReady(from raft.Ready) Ready {
65-
return Ready{HardState: from.HardState, Entries: from.Entries, MustSync: from.MustSync}
60+
// MakeMsgStorageAppend constructs a MsgStorageAppend from a raftpb.Message.
61+
func MakeMsgStorageAppend(m raftpb.Message) MsgStorageAppend {
62+
if m.Type != raftpb.MsgStorageAppend {
63+
panic(fmt.Sprintf("unexpected message type %s", m.Type))
64+
}
65+
return MsgStorageAppend(m)
6666
}
6767

6868
// RaftState stores information about the last entry and the size of the log.
@@ -87,6 +87,8 @@ type AppendStats struct {
8787
PebbleBytes int64
8888

8989
Sync bool
90+
// If true, PebbleEnd-PebbleBegin does not include the sync time.
91+
NonBlocking bool
9092
}
9193

9294
// Metrics contains metrics specific to the log storage.
@@ -100,37 +102,67 @@ type LogStore struct {
100102
Engine storage.Engine
101103
Sideload SideloadStorage
102104
StateLoader StateLoader
105+
SyncWaiter *SyncWaiterLoop
103106
EntryCache *raftentry.Cache
104107
Settings *cluster.Settings
105108
Metrics Metrics
106109
}
107110

111+
// SyncCallback is a callback that is notified when a raft log write has been
112+
// durably committed to disk. The function is handed the response messages that
113+
// are associated with the MsgStorageAppend that triggered the fsync.
114+
type SyncCallback interface {
115+
OnLogSync(context.Context, []raftpb.Message)
116+
}
117+
108118
func newStoreEntriesBatch(eng storage.Engine) storage.Batch {
109119
// Use an unindexed batch because we don't need to read our writes, and
110120
// it is more efficient.
111121
return eng.NewUnindexedBatch(false /* writeOnly */)
112122
}
113123

114-
// StoreEntries persists newly appended Raft log Entries to the log storage.
124+
// StoreEntries persists newly appended Raft log Entries to the log storage,
125+
// then calls the provided callback with the input's response messages (if any)
126+
// once the entries are durable. The durable log write may or may not be
127+
// blocking (and therefore the callback may or may not be called synchronously),
128+
// depending on the kv.raft_log.non_blocking_synchronization.enabled cluster
129+
// setting. Either way, the effects of the log append will be immediately
130+
// visible readers of the Engine.
131+
//
115132
// Accepts the state of the log before the operation, returns the state after.
116133
// Persists HardState atomically with, or strictly after Entries.
117134
func (s *LogStore) StoreEntries(
118-
ctx context.Context, state RaftState, rd Ready, stats *AppendStats,
135+
ctx context.Context, state RaftState, m MsgStorageAppend, cb SyncCallback, stats *AppendStats,
119136
) (RaftState, error) {
120137
batch := newStoreEntriesBatch(s.Engine)
121-
defer batch.Close()
122-
return s.storeEntriesAndCommitBatch(ctx, state, rd, stats, batch)
138+
return s.storeEntriesAndCommitBatch(ctx, state, m, cb, stats, batch)
123139
}
124140

141+
// storeEntriesAndCommitBatch is like StoreEntries, but it accepts a
142+
// storage.Batch, which it takes responsibility for committing and closing.
125143
func (s *LogStore) storeEntriesAndCommitBatch(
126-
ctx context.Context, state RaftState, rd Ready, stats *AppendStats, batch storage.Batch,
144+
ctx context.Context,
145+
state RaftState,
146+
m MsgStorageAppend,
147+
cb SyncCallback,
148+
stats *AppendStats,
149+
batch storage.Batch,
127150
) (RaftState, error) {
151+
// Before returning, Close the batch if we haven't handed ownership of it to a
152+
// SyncWaiterLoop. If batch == nil, SyncWaiterLoop is responsible for closing
153+
// it once the in-progress disk writes complete.
154+
defer func() {
155+
if batch != nil {
156+
defer batch.Close()
157+
}
158+
}()
159+
128160
prevLastIndex := state.LastIndex
129-
if len(rd.Entries) > 0 {
161+
if len(m.Entries) > 0 {
130162
stats.Begin = timeutil.Now()
131163
// All of the entries are appended to distinct keys, returning a new
132164
// last index.
133-
thinEntries, numSideloaded, sideLoadedEntriesSize, otherEntriesSize, err := MaybeSideloadEntries(ctx, rd.Entries, s.Sideload)
165+
thinEntries, numSideloaded, sideLoadedEntriesSize, otherEntriesSize, err := MaybeSideloadEntries(ctx, m.Entries, s.Sideload)
134166
if err != nil {
135167
const expl = "during sideloading"
136168
return RaftState{}, errors.Wrap(err, expl)
@@ -149,16 +181,21 @@ func (s *LogStore) storeEntriesAndCommitBatch(
149181
stats.End = timeutil.Now()
150182
}
151183

152-
if !raft.IsEmptyHardState(rd.HardState) {
184+
hs := raftpb.HardState{
185+
Term: m.Term,
186+
Vote: m.Vote,
187+
Commit: m.Commit,
188+
}
189+
if !raft.IsEmptyHardState(hs) {
153190
// NB: Note that without additional safeguards, it's incorrect to write
154-
// the HardState before appending rd.Entries. When catching up, a follower
191+
// the HardState before appending m.Entries. When catching up, a follower
155192
// will receive Entries that are immediately Committed in the same
156193
// Ready. If we persist the HardState but happen to lose the Entries,
157194
// assertions can be tripped.
158195
//
159196
// We have both in the same batch, so there's no problem. If that ever
160197
// changes, we must write and sync the Entries before the HardState.
161-
if err := s.StateLoader.SetHardState(ctx, batch, rd.HardState); err != nil {
198+
if err := s.StateLoader.SetHardState(ctx, batch, hs); err != nil {
162199
const expl = "during setHardState"
163200
return RaftState{}, errors.Wrap(err, expl)
164201
}
@@ -168,9 +205,9 @@ func (s *LogStore) storeEntriesAndCommitBatch(
168205
//
169206
// Note that the data is visible to other goroutines before it is synced to
170207
// disk. This is fine. The important constraints are that these syncs happen
171-
// before Raft messages are sent and before the call to RawNode.Advance. Our
172-
// regular locking is sufficient for this and if other goroutines can see the
173-
// data early, that's fine. In particular, snapshots are not a problem (I
208+
// before the MsgStorageAppend's responses are delivered back to the RawNode.
209+
// Our regular locking is sufficient for this and if other goroutines can see
210+
// the data early, that's fine. In particular, snapshots are not a problem (I
174211
// think they're the only thing that might access log entries or HardState
175212
// from other goroutines). Snapshots do not include either the HardState or
176213
// uncommitted log entries, and even if they did include log entries that
@@ -183,25 +220,54 @@ func (s *LogStore) storeEntriesAndCommitBatch(
183220
// (Replica), so this comment might need to move.
184221
stats.PebbleBegin = timeutil.Now()
185222
stats.PebbleBytes = int64(batch.Len())
186-
sync := rd.MustSync && !disableSyncRaftLog.Get(&s.Settings.SV)
187-
if err := batch.Commit(sync); err != nil {
188-
const expl = "while committing batch"
189-
return RaftState{}, errors.Wrap(err, expl)
223+
mustSync := len(m.Responses) > 0
224+
sync := mustSync && !disableSyncRaftLog.Get(&s.Settings.SV)
225+
nonBlockingSync := sync && enableNonBlockingRaftLogSync.Get(&s.Settings.SV)
226+
if nonBlockingSync {
227+
// If non-blocking synchronization is enabled, apply the batched updates to
228+
// the engine and initiate a synchronous disk write, but don't wait for the
229+
// write to complete. Instead, enqueue that waiting on the SyncWaiterLoop,
230+
// who will signal the callback when the write completes.
231+
if err := batch.CommitNoSyncWait(); err != nil {
232+
const expl = "while committing batch without sync wait"
233+
return RaftState{}, errors.Wrap(err, expl)
234+
}
235+
stats.PebbleEnd = timeutil.Now()
236+
s.SyncWaiter.enqueue(ctx, batch, func() {
237+
// NOTE: run on the SyncWaiterLoop goroutine.
238+
logCommitEnd := timeutil.Now()
239+
s.Metrics.RaftLogCommitLatency.RecordValue(logCommitEnd.Sub(stats.PebbleBegin).Nanoseconds())
240+
cb.OnLogSync(ctx, m.Responses)
241+
})
242+
// Do not Close batch on return. Will be Closed by SyncWaiterLoop.
243+
batch = nil
244+
} else {
245+
if err := batch.Commit(sync); err != nil {
246+
const expl = "while committing batch"
247+
return RaftState{}, errors.Wrap(err, expl)
248+
}
249+
stats.PebbleEnd = timeutil.Now()
250+
if mustSync {
251+
logCommitEnd := stats.PebbleEnd
252+
s.Metrics.RaftLogCommitLatency.RecordValue(logCommitEnd.Sub(stats.PebbleBegin).Nanoseconds())
253+
cb.OnLogSync(ctx, m.Responses)
254+
}
190255
}
191256
stats.Sync = sync
192-
stats.PebbleEnd = timeutil.Now()
193-
if rd.MustSync {
194-
s.Metrics.RaftLogCommitLatency.RecordValue(stats.PebbleEnd.Sub(stats.PebbleBegin).Nanoseconds())
195-
}
257+
stats.NonBlocking = nonBlockingSync
196258

197-
if len(rd.Entries) > 0 {
259+
// TODO BEFORE MERGE: how does this work with CommitNoSyncWait()? Do we need
260+
// to wait for the sync to finish before purging sideloaded entries? We don't
261+
// end up with log entries without their corresponding sideloaded SSTables.
262+
// What do we do with stats?
263+
if len(m.Entries) > 0 {
198264
// We may have just overwritten parts of the log which contain
199265
// sideloaded SSTables from a previous term (and perhaps discarded some
200266
// entries that we didn't overwrite). Remove any such leftover on-disk
201267
// payloads (we can do that now because we've committed the deletion
202268
// just above).
203-
firstPurge := rd.Entries[0].Index // first new entry written
204-
purgeTerm := rd.Entries[0].Term - 1
269+
firstPurge := m.Entries[0].Index // first new entry written
270+
purgeTerm := m.Entries[0].Term - 1
205271
lastPurge := prevLastIndex // old end of the log, include in deletion
206272
purgedSize, err := maybePurgeSideloaded(ctx, s.Sideload, firstPurge, lastPurge, purgeTerm)
207273
if err != nil {
@@ -217,7 +283,7 @@ func (s *LogStore) storeEntriesAndCommitBatch(
217283

218284
// Update raft log entry cache. We clear any older, uncommitted log entries
219285
// and cache the latest ones.
220-
s.EntryCache.Add(s.RangeID, rd.Entries, true /* truncate */)
286+
s.EntryCache.Add(s.RangeID, m.Entries, true /* truncate */)
221287

222288
return state, nil
223289
}

pkg/kv/kvserver/logstore/logstore_bench_test.go

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ func (b *discardBatch) Commit(bool) error {
3636
return nil
3737
}
3838

39+
type noopSyncCallback struct{}
40+
41+
func (noopSyncCallback) OnLogSync(context.Context, []raftpb.Message) {}
42+
3943
func BenchmarkLogStore_StoreEntries(b *testing.B) {
4044
defer log.Scope(b).Close(b)
4145
const kb = 1 << 10
@@ -48,23 +52,25 @@ func BenchmarkLogStore_StoreEntries(b *testing.B) {
4852
}
4953

5054
func runBenchmarkLogStore_StoreEntries(b *testing.B, bytes int64) {
55+
ctx := context.Background()
5156
const tenMB = 10 * 1 << 20
5257
ec := raftentry.NewCache(tenMB)
5358
const rangeID = 1
5459
eng := storage.NewDefaultInMemForTesting()
5560
defer eng.Close()
61+
st := cluster.MakeTestingClusterSettings()
62+
enableNonBlockingRaftLogSync.Override(ctx, &st.SV, false)
5663
s := LogStore{
5764
RangeID: rangeID,
5865
Engine: eng,
5966
StateLoader: NewStateLoader(rangeID),
6067
EntryCache: ec,
61-
Settings: cluster.MakeTestingClusterSettings(),
68+
Settings: st,
6269
Metrics: Metrics{
6370
RaftLogCommitLatency: metric.NewHistogram(metric.Metadata{}, 10*time.Second, metric.IOLatencyBuckets),
6471
},
6572
}
6673

67-
ctx := context.Background()
6874
rs := RaftState{
6975
LastTerm: 1,
7076
ByteSize: 0,
@@ -89,17 +95,13 @@ func runBenchmarkLogStore_StoreEntries(b *testing.B, bytes int64) {
8995
batch := &discardBatch{}
9096
for i := 0; i < b.N; i++ {
9197
batch.Batch = newStoreEntriesBatch(eng)
92-
rd := Ready{
93-
HardState: raftpb.HardState{},
94-
Entries: ents,
95-
MustSync: true,
96-
}
98+
m := MsgStorageAppend{Entries: ents}
99+
cb := noopSyncCallback{}
97100
var err error
98-
rs, err = s.storeEntriesAndCommitBatch(ctx, rs, rd, stats, batch)
101+
rs, err = s.storeEntriesAndCommitBatch(ctx, rs, m, cb, stats, batch)
99102
if err != nil {
100103
b.Fatal(err)
101104
}
102-
batch.Batch.Close()
103105
ents[0].Index++
104106
}
105107
require.EqualValues(b, b.N, rs.LastIndex)

0 commit comments

Comments
 (0)