@@ -80,6 +80,9 @@ type DB struct {
8080 mtx sync.Mutex
8181 // worker goroutine IdleTimeout = 5s
8282 snapshotWriterPool * pond.WorkerPool
83+
84+ // reusable write batch
85+ wbatch wal.Batch
8386}
8487
8588type Options struct {
@@ -440,8 +443,13 @@ func (db *DB) checkBackgroundSnapshotRewrite() error {
440443 db .snapshotRewriteCancel = nil
441444
442445 if result .mtree == nil {
443- // background snapshot rewrite failed
444- return fmt .Errorf ("background snapshot rewriting failed: %w" , result .err )
446+ if result .err != nil {
447+ // background snapshot rewrite failed
448+ return fmt .Errorf ("background snapshot rewriting failed: %w" , result .err )
449+ }
450+
451+ // background snapshot rewrite don't success, but no error to propagate, ignore it.
452+ return nil
445453 }
446454
447455 // wait for potential pending wal writings to finish, to make sure we catch up to latest state.
@@ -556,11 +564,17 @@ func (db *DB) Commit() (int64, error) {
556564 // async wal writing
557565 db .walChan <- & entry
558566 } else {
559- bz , err := entry . data . Marshal ()
567+ lastIndex , err := db . wal . LastIndex ()
560568 if err != nil {
561569 return 0 , err
562570 }
563- if err := db .wal .Write (entry .index , bz ); err != nil {
571+
572+ db .wbatch .Clear ()
573+ if err := writeEntry (& db .wbatch , db .logger , lastIndex , & entry ); err != nil {
574+ return 0 , err
575+ }
576+
577+ if err := db .wal .WriteBatch (& db .wbatch ); err != nil {
564578 return 0 , err
565579 }
566580 }
@@ -591,13 +605,17 @@ func (db *DB) initAsyncCommit() {
591605 break
592606 }
593607
608+ lastIndex , err := db .wal .LastIndex ()
609+ if err != nil {
610+ walQuit <- err
611+ return
612+ }
613+
594614 for _ , entry := range entries {
595- bz , err := entry .data .Marshal ()
596- if err != nil {
615+ if err := writeEntry (& batch , db .logger , lastIndex , entry ); err != nil {
597616 walQuit <- err
598617 return
599618 }
600- batch .Write (entry .index , bz )
601619 }
602620
603621 if err := db .wal .WriteBatch (& batch ); err != nil {
@@ -749,7 +767,8 @@ func (db *DB) rewriteSnapshotBackground() error {
749767
750768 cloned .logger .Info ("start rewriting snapshot" , "version" , cloned .Version ())
751769 if err := cloned .RewriteSnapshotWithContext (ctx ); err != nil {
752- ch <- snapshotResult {err : err }
770+ // write error log but don't stop the client, it could happen when load an old version.
771+ cloned .logger .Error ("failed to rewrite snapshot" , "err" , err )
753772 return
754773 }
755774 cloned .logger .Info ("finished rewriting snapshot" , "version" , cloned .Version ())
@@ -1093,3 +1112,17 @@ func channelBatchRecv[T any](ch <-chan *T) []*T {
10931112
10941113 return result
10951114}
1115+
1116+ func writeEntry (batch * wal.Batch , logger Logger , lastIndex uint64 , entry * walEntry ) error {
1117+ bz , err := entry .data .Marshal ()
1118+ if err != nil {
1119+ return err
1120+ }
1121+
1122+ if entry .index <= lastIndex {
1123+ logger .Error ("commit old version idempotently" , "lastIndex" , lastIndex , "version" , entry .index )
1124+ } else {
1125+ batch .Write (entry .index , bz )
1126+ }
1127+ return nil
1128+ }
0 commit comments