Skip to content

Commit 989cce8

Browse files
committed
etcdserver: integrate watchdog with all storage read/write operations
Signed-off-by: Benjamin Wang <[email protected]>
1 parent 5df72e2 commit 989cce8

File tree

3 files changed

+33
-3
lines changed

3 files changed

+33
-3
lines changed

server/etcdserver/api/snap/snapshotter.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"go.etcd.io/etcd/pkg/v3/pbutil"
3131
"go.etcd.io/etcd/server/v3/etcdserver/api/snap/snappb"
3232
"go.etcd.io/etcd/server/v3/storage/wal/walpb"
33+
"go.etcd.io/etcd/server/v3/watchdog"
3334
"go.etcd.io/raft/v3"
3435
"go.etcd.io/raft/v3/raftpb"
3536

@@ -88,7 +89,9 @@ func (s *Snapshotter) save(snapshot *raftpb.Snapshot) error {
8889
spath := filepath.Join(s.dir, fname)
8990

9091
fsyncStart := time.Now()
92+
cancel := watchdog.Register("save v2 snapshot")
9193
err = pioutil.WriteAndSyncFile(spath, d, 0666)
94+
cancel()
9295
snapFsyncSec.Observe(time.Since(fsyncStart).Seconds())
9396

9497
if err != nil {

server/storage/backend/batch_tx.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"go.uber.org/zap"
2525

2626
bolt "go.etcd.io/bbolt"
27+
"go.etcd.io/etcd/server/v3/watchdog"
2728
)
2829

2930
type BucketID int
@@ -114,7 +115,9 @@ func (t *batchTx) RUnlock() {
114115
}
115116

116117
func (t *batchTx) UnsafeCreateBucket(bucket Bucket) {
118+
cancel := watchdog.Register("batchTx createBucket")
117119
_, err := t.tx.CreateBucket(bucket.Name())
120+
cancel()
118121
if err != nil && err != bolt.ErrBucketExists {
119122
t.backend.lg.Fatal(
120123
"failed to create a bucket",
@@ -126,7 +129,9 @@ func (t *batchTx) UnsafeCreateBucket(bucket Bucket) {
126129
}
127130

128131
func (t *batchTx) UnsafeDeleteBucket(bucket Bucket) {
132+
cancel := watchdog.Register("batchTx deleteBucket")
129133
err := t.tx.DeleteBucket(bucket.Name())
134+
cancel()
130135
if err != nil && err != bolt.ErrBucketNotFound {
131136
t.backend.lg.Fatal(
132137
"failed to delete a bucket",
@@ -161,7 +166,11 @@ func (t *batchTx) unsafePut(bucketType Bucket, key []byte, value []byte, seq boo
161166
// this can delay the page split and reduce space usage.
162167
bucket.FillPercent = 0.9
163168
}
164-
if err := bucket.Put(key, value); err != nil {
169+
170+
cancel := watchdog.Register("batchTx put")
171+
err := bucket.Put(key, value)
172+
cancel()
173+
if err != nil {
165174
t.backend.lg.Fatal(
166175
"failed to write to a bucket",
167176
zap.Stringer("bucket-name", bucketType),
@@ -216,7 +225,9 @@ func (t *batchTx) UnsafeDelete(bucketType Bucket, key []byte) {
216225
zap.Stack("stack"),
217226
)
218227
}
228+
cancel := watchdog.Register("batchTx delete")
219229
err := bucket.Delete(key)
230+
cancel()
220231
if err != nil {
221232
t.backend.lg.Fatal(
222233
"failed to delete a key",
@@ -268,9 +279,11 @@ func (t *batchTx) commit(stop bool) {
268279

269280
start := time.Now()
270281

282+
cancel := watchdog.Register("batchTx commit")
271283
// gofail: var beforeCommit struct{}
272284
err := t.tx.Commit()
273285
// gofail: var afterCommit struct{}
286+
cancel()
274287

275288
rebalanceSec.Observe(t.tx.Stats().RebalanceTime.Seconds())
276289
spillSec.Observe(t.tx.Stats().SpillTime.Seconds())

server/storage/wal/wal.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"go.etcd.io/etcd/client/pkg/v3/fileutil"
3030
"go.etcd.io/etcd/pkg/v3/pbutil"
3131
"go.etcd.io/etcd/server/v3/storage/wal/walpb"
32+
"go.etcd.io/etcd/server/v3/watchdog"
3233
"go.etcd.io/raft/v3"
3334
"go.etcd.io/raft/v3/raftpb"
3435

@@ -798,6 +799,8 @@ func (w *WAL) cut() error {
798799
}
799800

800801
func (w *WAL) sync() error {
802+
cancel := watchdog.Register("WAL sync")
803+
defer cancel()
801804
if w.encoder != nil {
802805
if err := w.encoder.flush(); err != nil {
803806
return err
@@ -935,12 +938,19 @@ func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error {
935938
mustSync := raft.MustSync(st, w.state, len(ents))
936939

937940
// TODO(xiangli): no more reference operator
941+
cancel := watchdog.Register("WAL saveEntry")
938942
for i := range ents {
939943
if err := w.saveEntry(&ents[i]); err != nil {
944+
cancel()
940945
return err
941946
}
942947
}
943-
if err := w.saveState(&st); err != nil {
948+
cancel()
949+
950+
cancel = watchdog.Register("WAL saveState")
951+
err := w.saveState(&st)
952+
cancel()
953+
if err != nil {
944954
return err
945955
}
946956

@@ -972,9 +982,13 @@ func (w *WAL) SaveSnapshot(e walpb.Snapshot) error {
972982
defer w.mu.Unlock()
973983

974984
rec := &walpb.Record{Type: SnapshotType, Data: b}
975-
if err := w.encoder.encode(rec); err != nil {
985+
cancel := watchdog.Register("WAL saveSnapshot")
986+
err := w.encoder.encode(rec)
987+
cancel()
988+
if err != nil {
976989
return err
977990
}
991+
978992
// update enti only when snapshot is ahead of last index
979993
if w.enti < e.Index {
980994
w.enti = e.Index

0 commit comments

Comments
 (0)