@@ -88,6 +88,10 @@ type FSM struct {
8888 storeLatestState bool
8989
9090 chunker * raftchunking.ChunkingBatchingFSM
91+
92+ // testSnapshotRestoreError is used in tests to simulate an error while
93+ // restoring a snapshot.
94+ testSnapshotRestoreError bool
9195}
9296
9397// NewFSM constructs a FSM using the given directory
@@ -193,20 +197,20 @@ func (f *FSM) witnessIndex(i *IndexValue) {
193197 }
194198}
195199
196- func (f * FSM ) witnessSnapshot (index , term , configurationIndex uint64 , configuration raft.Configuration ) error {
200+ func (f * FSM ) witnessSnapshot (metadata * raft.SnapshotMeta ) error {
197201 var indexBytes []byte
198202 latestIndex , _ := f .LatestState ()
199203
200- latestIndex .Index = index
201- latestIndex .Term = term
204+ latestIndex .Index = metadata . Index
205+ latestIndex .Term = metadata . Term
202206
203207 var err error
204208 indexBytes , err = proto .Marshal (latestIndex )
205209 if err != nil {
206210 return err
207211 }
208212
209- protoConfig := raftConfigurationToProtoConfiguration (configurationIndex , configuration )
213+ protoConfig := raftConfigurationToProtoConfiguration (metadata . ConfigurationIndex , metadata . Configuration )
210214 configBytes , err := proto .Marshal (protoConfig )
211215 if err != nil {
212216 return err
@@ -232,16 +236,16 @@ func (f *FSM) witnessSnapshot(index, term, configurationIndex uint64, configurat
232236 }
233237 }
234238
235- atomic .StoreUint64 (f .latestIndex , index )
236- atomic .StoreUint64 (f .latestTerm , term )
239+ atomic .StoreUint64 (f .latestIndex , metadata . Index )
240+ atomic .StoreUint64 (f .latestTerm , metadata . Term )
237241 f .latestConfig .Store (protoConfig )
238242
239243 return nil
240244}
241245
242246// Delete deletes the given key from the bolt file.
243247func (f * FSM ) Delete (ctx context.Context , path string ) error {
244- defer metrics .MeasureSince ([]string {"raft " , "delete" }, time .Now ())
248+ defer metrics .MeasureSince ([]string {"raft_storage" , "fsm " , "delete" }, time .Now ())
245249
246250 f .l .RLock ()
247251 defer f .l .RUnlock ()
@@ -253,7 +257,7 @@ func (f *FSM) Delete(ctx context.Context, path string) error {
253257
254258// Delete deletes the given key from the bolt file.
255259func (f * FSM ) DeletePrefix (ctx context.Context , prefix string ) error {
256- defer metrics .MeasureSince ([]string {"raft " , "delete_prefix" }, time .Now ())
260+ defer metrics .MeasureSince ([]string {"raft_storage" , "fsm " , "delete_prefix" }, time .Now ())
257261
258262 f .l .RLock ()
259263 defer f .l .RUnlock ()
@@ -277,7 +281,9 @@ func (f *FSM) DeletePrefix(ctx context.Context, prefix string) error {
277281
278282// Get retrieves the value at the given path from the bolt file.
279283func (f * FSM ) Get (ctx context.Context , path string ) (* physical.Entry , error ) {
284+ // TODO: Remove this outdated metric name in an older release
280285 defer metrics .MeasureSince ([]string {"raft" , "get" }, time .Now ())
286+ defer metrics .MeasureSince ([]string {"raft_storage" , "fsm" , "get" }, time .Now ())
281287
282288 f .l .RLock ()
283289 defer f .l .RUnlock ()
@@ -311,7 +317,7 @@ func (f *FSM) Get(ctx context.Context, path string) (*physical.Entry, error) {
311317
312318// Put writes the given entry to the bolt file.
313319func (f * FSM ) Put (ctx context.Context , entry * physical.Entry ) error {
314- defer metrics .MeasureSince ([]string {"raft " , "put" }, time .Now ())
320+ defer metrics .MeasureSince ([]string {"raft_storage" , "fsm " , "put" }, time .Now ())
315321
316322 f .l .RLock ()
317323 defer f .l .RUnlock ()
@@ -324,7 +330,9 @@ func (f *FSM) Put(ctx context.Context, entry *physical.Entry) error {
324330
325331// List retrieves the set of keys with the given prefix from the bolt file.
326332func (f * FSM ) List (ctx context.Context , prefix string ) ([]string , error ) {
333+ // TODO: Remove this outdated metric name in a future release
327334 defer metrics .MeasureSince ([]string {"raft" , "list" }, time .Now ())
335+ defer metrics .MeasureSince ([]string {"raft_storage" , "fsm" , "list" }, time .Now ())
328336
329337 f .l .RLock ()
330338 defer f .l .RUnlock ()
@@ -531,6 +539,8 @@ type writeErrorCloser interface {
531539// (size, checksum, etc) and a second for the sink of the data. We also use a
532540// proto delimited writer so we can stream proto messages to the sink.
533541func (f * FSM ) writeTo (ctx context.Context , metaSink writeErrorCloser , sink writeErrorCloser ) {
542+ defer metrics .MeasureSince ([]string {"raft_storage" , "fsm" , "write_snapshot" }, time .Now ())
543+
534544 protoWriter := protoio .NewDelimitedWriter (sink )
535545 metadataProtoWriter := protoio .NewDelimitedWriter (metaSink )
536546
@@ -573,7 +583,9 @@ func (f *FSM) writeTo(ctx context.Context, metaSink writeErrorCloser, sink write
573583
574584// Snapshot implements the FSM interface. It returns a noop snapshot object.
575585func (f * FSM ) Snapshot () (raft.FSMSnapshot , error ) {
576- return & noopSnapshotter {}, nil
586+ return & noopSnapshotter {
587+ fsm : f ,
588+ }, nil
577589}
578590
579591// SetNoopRestore is used to disable restore operations on raft startup. Because
@@ -589,48 +601,91 @@ func (f *FSM) SetNoopRestore(enabled bool) {
589601// first deletes the existing bucket to clear all existing data, then recreates
590602// it so we can copy in the snapshot.
591603func (f * FSM ) Restore (r io.ReadCloser ) error {
604+ defer metrics .MeasureSince ([]string {"raft_storage" , "fsm" , "restore_snapshot" }, time .Now ())
605+
592606 if f .noopRestore == true {
593607 return nil
594608 }
595609
610+ snapMeta := r .(* boltSnapshotMetadataReader ).Metadata ()
611+
596612 protoReader := protoio .NewDelimitedReader (r , math .MaxInt32 )
597613 defer protoReader .Close ()
598614
599615 f .l .Lock ()
600616 defer f .l .Unlock ()
601617
602- // Start a write transaction.
618+ // Delete the existing data bucket and create a new one.
619+ f .logger .Debug ("snapshot restore: deleting bucket" )
603620 err := f .db .Update (func (tx * bolt.Tx ) error {
604621 err := tx .DeleteBucket (dataBucketName )
605622 if err != nil {
606623 return err
607624 }
608625
609- b , err : = tx .CreateBucket (dataBucketName )
626+ _ , err = tx .CreateBucket (dataBucketName )
610627 if err != nil {
611628 return err
612629 }
613630
614- for {
631+ return nil
632+ })
633+ if err != nil {
634+ f .logger .Error ("could not restore snapshot: could not clear existing bucket" , "error" , err )
635+ return err
636+ }
637+
638+ // If we are testing a failed snapshot error here.
639+ if f .testSnapshotRestoreError {
640+ return errors .New ("Test error" )
641+ }
642+
643+ f .logger .Debug ("snapshot restore: deleting bucket done" )
644+ f .logger .Debug ("snapshot restore: writing keys" )
645+
646+ var done bool
647+ var keys int
648+ for ! done {
649+ err := f .db .Update (func (tx * bolt.Tx ) error {
650+ b := tx .Bucket (dataBucketName )
615651 s := new (pb.StorageEntry )
616- err := protoReader .ReadMsg (s )
617- if err != nil {
618- if err == io .EOF {
619- return nil
652+
653+ // Commit in batches of 50k. Bolt holds all the data in memory and
654+ // doesn't split the pages until commit so we do incremental writes.
655+ // This is safe since we have a write lock on the fsm's lock.
656+ for i := 0 ; i < 50000 ; i ++ {
657+ err := protoReader .ReadMsg (s )
658+ if err != nil {
659+ if err == io .EOF {
660+ done = true
661+ return nil
662+ }
663+ return err
620664 }
621- return err
622- }
623665
624- err = b .Put ([]byte (s .Key ), s .Value )
625- if err != nil {
626- return err
666+ err = b .Put ([]byte (s .Key ), s .Value )
667+ if err != nil {
668+ return err
669+ }
670+ keys += 1
627671 }
672+
673+ return nil
674+ })
675+ if err != nil {
676+ f .logger .Error ("could not restore snapshot" , "error" , err )
677+ return err
628678 }
629679
630- return nil
631- })
632- if err != nil {
633- f .logger .Error ("could not restore snapshot" , "error" , err )
680+ f .logger .Trace ("snapshot restore: writing keys" , "num_written" , keys )
681+ }
682+
683+ f .logger .Debug ("snapshot restore: writing keys done" )
684+
685+ // Write the metadata after we have applied all the snapshot data
686+ f .logger .Debug ("snapshot restore: writing metadata" )
687+ if err := f .witnessSnapshot (snapMeta ); err != nil {
688+ f .logger .Error ("could not write metadata" , "error" , err )
634689 return err
635690 }
636691
@@ -639,10 +694,23 @@ func (f *FSM) Restore(r io.ReadCloser) error {
639694
640695// noopSnapshotter implements the fsm.Snapshot interface. It doesn't do anything
641696// since our SnapshotStore reads data out of the FSM on Open().
642- type noopSnapshotter struct {}
697+ type noopSnapshotter struct {
698+ fsm * FSM
699+ }
643700
644- // Persist doesn't do anything.
701+ // Persist implements the fsm.Snapshot interface. It doesn't need to persist any
702+ // state data, but it does persist the raft metadata. This is necessary so we
703+ // can be sure to capture indexes for operation types that are not sent to the
704+ // FSM.
645705func (s * noopSnapshotter ) Persist (sink raft.SnapshotSink ) error {
706+ boltSnapshotSink := sink .(* BoltSnapshotSink )
707+
708+ // We are processing a snapshot, fastforward the index, term, and
709+ // configuration to the latest seen by the raft system.
710+ if err := s .fsm .witnessSnapshot (& boltSnapshotSink .meta ); err != nil {
711+ return err
712+ }
713+
646714 return nil
647715}
648716
0 commit comments