From 58e63d96f27f466c8e6c1c65538854ef19626356 Mon Sep 17 00:00:00 2001 From: jiefenghuang Date: Wed, 19 Mar 2025 15:56:48 +0800 Subject: [PATCH 1/3] cmd/gc: scan leaked inodes and edges Signed-off-by: jiefenghuang --- cmd/fsck.go | 2 +- cmd/gc.go | 2 +- pkg/meta/base_test.go | 93 +++++++++++++++++++++++++++++++++++++++++-- pkg/meta/interface.go | 2 +- pkg/meta/redis.go | 60 +++++++++++++++++++++------- pkg/meta/sql.go | 62 ++++++++++++++++++++++++++++- pkg/meta/tkv.go | 62 ++++++++++++++++++++++++++++- 7 files changed, 260 insertions(+), 23 deletions(-) diff --git a/cmd/fsck.go b/cmd/fsck.go index a3e0b6502335..21fdd345a0f7 100644 --- a/cmd/fsck.go +++ b/cmd/fsck.go @@ -133,7 +133,7 @@ func fsck(ctx *cli.Context) error { // List all slices in metadata engine sliceCSpin := progress.AddCountSpinner("Listed slices") slices := make(map[meta.Ino][]meta.Slice) - r := m.ListSlices(c, slices, false, false, sliceCSpin.Increment) + r := m.ListSlices(c, slices, false, false, false, sliceCSpin.Increment) if r != 0 { logger.Fatalf("list all slices: %s", r) } diff --git a/cmd/gc.go b/cmd/gc.go index dccef33222af..8242d080ad97 100644 --- a/cmd/gc.go +++ b/cmd/gc.go @@ -196,7 +196,7 @@ func gc(ctx *cli.Context) error { // List all slices in metadata engine slices := make(map[meta.Ino][]meta.Slice) - r := m.ListSlices(c, slices, true, delete, sliceCSpin.Increment) + r := m.ListSlices(c, slices, true, true, delete, sliceCSpin.Increment) if r != 0 { logger.Fatalf("list all slices: %s", r) } diff --git a/pkg/meta/base_test.go b/pkg/meta/base_test.go index b08607e00611..d5f0710d6265 100644 --- a/pkg/meta/base_test.go +++ b/pkg/meta/base_test.go @@ -124,6 +124,7 @@ func testMeta(t *testing.T, m Meta) { } testMetaClient(t, m) + testCleanLeakedObjects(t, m) testTruncateAndDelete(t, m) testTrash(t, m) testParents(t, m) @@ -1491,7 +1492,7 @@ func testCompaction(t *testing.T, m Meta, trash bool) { } p.Done() sliceMap := make(map[Ino][]Slice) - if st := m.ListSlices(ctx, sliceMap, false, false, nil); st != 0 { + if st := m.ListSlices(ctx, sliceMap, false, false, false, nil); st != 0 { t.Fatalf("list all slices: %s", st) } @@ -1700,7 +1701,7 @@ func testTruncateAndDelete(t *testing.T, m Meta) { } var total int64 slices := make(map[Ino][]Slice) - m.ListSlices(ctx, slices, false, false, func() { total++ }) + m.ListSlices(ctx, slices, false, false, false, func() { total++ }) var totalSlices int for _, ss := range slices { totalSlices += len(ss) @@ -1715,7 +1716,7 @@ func testTruncateAndDelete(t *testing.T, m Meta) { time.Sleep(time.Millisecond * 100) slices = make(map[Ino][]Slice) - m.ListSlices(ctx, slices, false, false, nil) + m.ListSlices(ctx, slices, false, false, false, nil) totalSlices = 0 for _, ss := range slices { totalSlices += len(ss) @@ -3284,6 +3285,92 @@ func TestSymlinkCache(t *testing.T) { require.Equal(t, int32(8000), cache.size.Load()) } +func testCleanLeakedObjects(t *testing.T, m Meta) { + if err := m.Init(testFormat(), false); err != nil { + t.Fatalf("init error: %s", err) + } + + ctx := Background() + var parent Ino + var pattr Attr + pname := "d1" + if st := m.Mkdir(ctx, RootInode, pname, 0755, 0, 0, &parent, &pattr); st != 0 { + t.Fatalf("Mkdir %s: %s", pname, st) + } + defer m.Rmdir(ctx, RootInode, pname) + + num := 5 + cnames := make([]string, num) + cnodes := make([]Ino, num) + cattr := make([]Attr, num) + for i := 0; i < num; i++ { + cnames[i] = fmt.Sprintf("f%d", i+1) + if st := m.Mknod(ctx, parent, cnames[i], TypeFile, 0755, 0, 0, "", &cnodes[i], &cattr[i]); st != 0 { + t.Fatalf("Mknod %s: %s", cnames[i], st) + } + defer m.Unlink(ctx, parent, cnames[i]) + } + + time.Sleep(time.Second) + // delete cnodes[0]'s edge and cnodes[1]'s node + switch m.Name() { + case "redis": + meta := m.getBase().en.(*redisMeta) + require.Nil(t, meta.rdb.HDel(ctx, meta.entryKey(parent), cnames[0]).Err()) + require.Nil(t, meta.rdb.Del(ctx, meta.inodeKey(cnodes[1])).Err()) + meta.cleanupLeakedInodesAndEdges(true, 0) + + require.Equal(t, redis.Nil, meta.rdb.Get(ctx, meta.inodeKey(cnodes[0])).Err()) + require.Equal(t, redis.Nil, meta.rdb.HGet(ctx, meta.entryKey(parent), cnames[1]).Err()) + case "badger": + meta := m.getBase().en.(*kvMeta) + meta.client.txn(Background(), func(tx *kvTxn) error { + tx.delete(meta.entryKey(parent, cnames[0])) + tx.delete(meta.inodeKey(cnodes[1])) + return nil + }, 0) + meta.cleanupLeakedInodesAndEdges(true, 0) + meta.client.txn(Background(), func(tx *kvTxn) error { + require.Nil(t, tx.get(meta.inodeKey(cnodes[0]))) + require.Nil(t, tx.get(meta.entryKey(parent, cnames[1]))) + return nil + }, 0) + case "sqlite3": + meta := m.getBase().en.(*dbMeta) + _, err := meta.db.Delete(&edge{Parent: parent, Name: []byte(cnames[0])}) + require.Nil(t, err) + _, err = meta.db.Delete(&node{Inode: cnodes[1]}) + require.Nil(t, err) + meta.cleanupLeakedInodesAndEdges(true, 0) + ok, err := meta.db.Exist(&node{Inode: cnodes[0]}) + require.Nil(t, err) + require.False(t, ok) + ok, err = meta.db.Exist(&edge{Parent: parent, Name: []byte(cnames[1])}) + require.Nil(t, err) + require.False(t, ok) + default: + return + } + + var entries []*Entry + if st := m.Readdir(ctx, parent, 0, &entries); st != 0 { + t.Fatalf("Readdir %s: %s", pname, st) + } + require.Equal(t, num, len(entries)) + expected := map[Ino]bool{ + RootInode: true, + parent: true, + cnodes[2]: true, + cnodes[3]: true, + cnodes[4]: true, + } + for i := 0; i < num; i++ { + if _, ok := expected[entries[i].Inode]; !ok { + t.Fatalf("Unexpected entry %s", entries[i].Name) + } + } +} + func TestTxBatchLock(t *testing.T) { var base baseMeta // 0 inode diff --git a/pkg/meta/interface.go b/pkg/meta/interface.go index 702750fc5291..985180b60d3f 100644 --- a/pkg/meta/interface.go +++ b/pkg/meta/interface.go @@ -489,7 +489,7 @@ type Meta interface { Compact(ctx Context, inode Ino, concurrency int, preFunc, postFunc func()) syscall.Errno // ListSlices returns all slices used by all files. - ListSlices(ctx Context, slices map[Ino][]Slice, scanPending, delete bool, showProgress func()) syscall.Errno + ListSlices(ctx Context, slices map[Ino][]Slice, scanPending, scanLeaked, delete bool, showProgress func()) syscall.Errno // Remove all files and directories recursively. // count represents the number of attempted deletions of entries (even if failed). Remove(ctx Context, parent Ino, name string, skipTrash bool, numThreads int, count *uint64) syscall.Errno diff --git a/pkg/meta/redis.go b/pkg/meta/redis.go index 4e68384cf8f2..bec71ab086e7 100644 --- a/pkg/meta/redis.go +++ b/pkg/meta/redis.go @@ -3052,15 +3052,19 @@ func (m *redisMeta) scanAllChunks(ctx Context, ch chan<- cchunk, bar *utils.Bar) }) } -func (m *redisMeta) cleanupLeakedInodes(delete bool) { +func (m *redisMeta) cleanupLeakedInodesAndEdges(clean bool, before time.Duration) { + type entry struct { + Parent Ino + Name string + } var ctx = Background() - var foundInodes = make(map[Ino]struct{}) - foundInodes[RootInode] = struct{}{} - foundInodes[TrashInode] = struct{}{} - cutoff := time.Now().Add(time.Hour * -1) + var foundInodes = make(map[Ino]*entry) + foundInodes[RootInode] = &entry{Parent: RootInode, Name: "/"} + foundInodes[TrashInode] = &entry{Parent: RootInode, Name: ".trash"} + cutoff := time.Now().Add(-before) prefix := len(m.prefix) - _ = m.scan(ctx, "d[0-9]*", func(keys []string) error { + if err := m.scan(ctx, "d[0-9]*", func(keys []string) error { for _, key := range keys { ino, _ := strconv.Atoi(key[prefix+1:]) var entries []*Entry @@ -3070,12 +3074,16 @@ func (m *redisMeta) cleanupLeakedInodes(delete bool) { return eno } for _, e := range entries { - foundInodes[e.Inode] = struct{}{} + foundInodes[e.Inode] = &entry{Parent: Ino(ino), Name: string(e.Name)} } } return nil - }) - _ = m.scan(ctx, "i*", func(keys []string) error { + }); err != nil { + logger.Errorf("scan directories: %s", err) + return + } + + if err := m.scan(ctx, "i*", func(keys []string) error { values, err := m.rdb.MGet(ctx, keys...).Result() if err != nil { logger.Warnf("mget inodes: %s", err) @@ -3089,17 +3097,37 @@ func (m *redisMeta) cleanupLeakedInodes(delete bool) { m.parseAttr([]byte(v.(string)), &attr) ino, _ := strconv.Atoi(keys[i][prefix+1:]) if _, ok := foundInodes[Ino(ino)]; !ok && time.Unix(attr.Ctime, 0).Before(cutoff) { - logger.Infof("found dangling inode: %s %+v", keys[i], attr) - if delete { + logger.Infof("found leaded inode: %d %+v", ino, attr) + if clean { err = m.doDeleteSustainedInode(0, Ino(ino)) if err != nil { logger.Errorf("delete leaked inode %d : %s", ino, err) } } } + foundInodes[Ino(ino)] = nil } return nil - }) + }); err != nil { + logger.Errorf("scan inodes: %s", err) + return + } + + foundInodes[RootInode], foundInodes[TrashInode] = nil, nil + pipe := m.rdb.Pipeline() + for c, e := range foundInodes { + if e != nil { + logger.Infof("found leaked edge %d -> (%d, %s)", e.Parent, c, e.Name) + if clean { + pipe.HDel(ctx, m.entryKey(e.Parent), e.Name) + } + } + } + if pipe.Len() > 0 { + if _, err := pipe.Exec(ctx); err != nil { + logger.Errorf("delete leaked edges: %s", err) + } + } } func (m *redisMeta) scan(ctx context.Context, pattern string, f func([]string) error) error { @@ -3155,9 +3183,11 @@ func (m *redisMeta) hscan(ctx context.Context, key string, f func([]string) erro return nil } -func (m *redisMeta) ListSlices(ctx Context, slices map[Ino][]Slice, scanPending, delete bool, showProgress func()) syscall.Errno { - m.cleanupLeakedInodes(delete) - m.cleanupLeakedChunks(delete) +func (m *redisMeta) ListSlices(ctx Context, slices map[Ino][]Slice, scanPending, scanLeaked, delete bool, showProgress func()) syscall.Errno { + if scanLeaked { + m.cleanupLeakedInodesAndEdges(delete, time.Hour) + m.cleanupLeakedChunks(delete) + } m.cleanupOldSliceRefs(delete) if delete { m.doCleanupSlices() diff --git a/pkg/meta/sql.go b/pkg/meta/sql.go index fb10d8d74756..faa5e8adc02b 100644 --- a/pkg/meta/sql.go +++ b/pkg/meta/sql.go @@ -3149,7 +3149,67 @@ func (m *dbMeta) scanAllChunks(ctx Context, ch chan<- cchunk, bar *utils.Bar) er }) } -func (m *dbMeta) ListSlices(ctx Context, slices map[Ino][]Slice, scanPending, delete bool, showProgress func()) syscall.Errno { +func (m *dbMeta) cleanupLeakedInodesAndEdges(clean bool, before time.Duration) { + type entry struct { + Parent Ino + Name []byte + } + var foundInodes = make(map[Ino]*entry) + foundInodes[RootInode] = &entry{Parent: RootInode, Name: []byte("/")} + foundInodes[TrashInode] = &entry{Parent: RootInode, Name: []byte(".trash")} + cutoff := time.Now().Add(-before) + + var edges []edge + var nodes []node + if err := m.txn(func(s *xorm.Session) error { + if err := s.Find(&edges); err != nil { + return err + } + for _, edge := range edges { + foundInodes[edge.Inode] = &entry{Parent: edge.Parent, Name: edge.Name} + } + if err := s.Find(&nodes); err != nil { + return err + } + return nil + }); err != nil { + logger.Errorf("scan all edges and nodes: %s", err) + return + } + + for _, node := range nodes { + if _, ok := foundInodes[node.Inode]; !ok && time.Unix(node.Ctime/1e6, 0).Before(cutoff) { + logger.Infof("found leaded inode: %d %+v", node.Inode, node) + if clean { + err := m.doDeleteSustainedInode(0, node.Inode) + if err != nil { + logger.Errorf("delete leaked inode %d : %s", node.Inode, err) + } + } + } + foundInodes[node.Inode] = nil + } + + foundInodes[RootInode], foundInodes[TrashInode] = nil, nil + m.txn(func(s *xorm.Session) error { + for c, e := range foundInodes { + if e != nil { + logger.Infof("found leaked edge %d -> (%d, %s)", e.Parent, c, e.Name) + if clean { + if _, err := s.Delete(&edge{Parent: e.Parent, Name: e.Name}); err != nil { + return err + } + } + } + } + return nil + }) +} + +func (m *dbMeta) ListSlices(ctx Context, slices map[Ino][]Slice, scanPending, scanLeaked, delete bool, showProgress func()) syscall.Errno { + if scanLeaked { + m.cleanupLeakedInodesAndEdges(delete, time.Hour) + } if delete { m.doCleanupSlices() } diff --git a/pkg/meta/tkv.go b/pkg/meta/tkv.go index 9a5b91a8112e..aef65b457613 100644 --- a/pkg/meta/tkv.go +++ b/pkg/meta/tkv.go @@ -2439,7 +2439,67 @@ func (m *kvMeta) scanAllChunks(ctx Context, ch chan<- cchunk, bar *utils.Bar) er }) } -func (m *kvMeta) ListSlices(ctx Context, slices map[Ino][]Slice, scanPending, delete bool, showProgress func()) syscall.Errno { +func (m *kvMeta) cleanupLeakedInodesAndEdges(clean bool, before time.Duration) { + type entry struct { + Parent Ino + Name []byte + } + var foundInodes = make(map[Ino]*entry) + foundInodes[RootInode] = &entry{Parent: RootInode, Name: []byte("/")} + foundInodes[TrashInode] = &entry{Parent: RootInode, Name: []byte(".trash")} + cutoff := time.Now().Add(-before) + + nodes := make(map[Ino]*Attr) + if err := m.client.scan(nil, func(key, value []byte) { + if len(key) > 9 && key[0] == 'A' { + ino := m.decodeInode(key[1:9]) + switch key[9] { + case 'I': + attr := &Attr{Nlink: 1} + m.parseAttr(value, attr) + nodes[ino] = attr + case 'D': + name := string(key[10:]) + _, inode := m.parseEntry(value) + foundInodes[inode] = &entry{Parent: ino, Name: []byte(name)} + } + } + }); err != nil { + logger.Errorf("scan all inodes and edges: %s", err) + return + } + + for inode, attr := range nodes { + if _, ok := foundInodes[inode]; !ok && time.Unix(attr.Ctime, 0).Before(cutoff) { + logger.Infof("found leaded inode: %d %+v", inode, attr) + if clean { + err := m.doDeleteSustainedInode(0, inode) + if err != nil { + logger.Errorf("delete leaked inode %d : %s", inode, err) + } + } + } + foundInodes[inode] = nil + } + + foundInodes[RootInode], foundInodes[TrashInode] = nil, nil + m.client.txn(Background(), func(tx *kvTxn) error { + for c, e := range foundInodes { + if e != nil { + logger.Infof("found leaked edge %d -> (%d, %s)", e.Parent, c, e.Name) + if clean { + tx.delete(m.entryKey(e.Parent, string(e.Name))) + } + } + } + return nil + }, 0) +} + +func (m *kvMeta) ListSlices(ctx Context, slices map[Ino][]Slice, scanPending, scanLeaked, delete bool, showProgress func()) syscall.Errno { + if scanLeaked { + m.cleanupLeakedInodesAndEdges(delete, time.Hour) + } if delete { m.doCleanupSlices() } From 4e84c1adbb95f02401a90cc30cc08727dfa03017 Mon Sep 17 00:00:00 2001 From: jiefenghuang Date: Wed, 19 Mar 2025 16:16:21 +0800 Subject: [PATCH 2/3] fix lint Signed-off-by: jiefenghuang --- pkg/meta/sql.go | 7 +++++-- pkg/meta/tkv.go | 7 +++++-- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/pkg/meta/sql.go b/pkg/meta/sql.go index faa5e8adc02b..d3b0a4619203 100644 --- a/pkg/meta/sql.go +++ b/pkg/meta/sql.go @@ -3191,7 +3191,7 @@ func (m *dbMeta) cleanupLeakedInodesAndEdges(clean bool, before time.Duration) { } foundInodes[RootInode], foundInodes[TrashInode] = nil, nil - m.txn(func(s *xorm.Session) error { + if err := m.txn(func(s *xorm.Session) error { for c, e := range foundInodes { if e != nil { logger.Infof("found leaked edge %d -> (%d, %s)", e.Parent, c, e.Name) @@ -3203,7 +3203,10 @@ func (m *dbMeta) cleanupLeakedInodesAndEdges(clean bool, before time.Duration) { } } return nil - }) + }); err != nil { + logger.Errorf("delete leaked edges: %s", err) + return + } } func (m *dbMeta) ListSlices(ctx Context, slices map[Ino][]Slice, scanPending, scanLeaked, delete bool, showProgress func()) syscall.Errno { diff --git a/pkg/meta/tkv.go b/pkg/meta/tkv.go index aef65b457613..86a13801bb70 100644 --- a/pkg/meta/tkv.go +++ b/pkg/meta/tkv.go @@ -2483,7 +2483,7 @@ func (m *kvMeta) cleanupLeakedInodesAndEdges(clean bool, before time.Duration) { } foundInodes[RootInode], foundInodes[TrashInode] = nil, nil - m.client.txn(Background(), func(tx *kvTxn) error { + if err := m.client.txn(Background(), func(tx *kvTxn) error { for c, e := range foundInodes { if e != nil { logger.Infof("found leaked edge %d -> (%d, %s)", e.Parent, c, e.Name) @@ -2493,7 +2493,10 @@ func (m *kvMeta) cleanupLeakedInodesAndEdges(clean bool, before time.Duration) { } } return nil - }, 0) + }, 0); err != nil { + logger.Errorf("delete leaked edges: %s", err) + return + } } func (m *kvMeta) ListSlices(ctx Context, slices map[Ino][]Slice, scanPending, scanLeaked, delete bool, showProgress func()) syscall.Errno { From 67ca95caad7e27180dfbca0e7cefefcb22fe8dfc Mon Sep 17 00:00:00 2001 From: jiefenghuang Date: Fri, 21 Mar 2025 15:05:20 +0800 Subject: [PATCH 3/3] review Signed-off-by: jiefenghuang --- cmd/gc.go | 7 ++++++- pkg/meta/redis.go | 42 ++++++++++++++++++++++++------------------ pkg/meta/sql.go | 24 +++++++++++++++--------- pkg/meta/tkv.go | 21 ++++++++++++--------- 4 files changed, 57 insertions(+), 37 deletions(-) diff --git a/cmd/gc.go b/cmd/gc.go index 8242d080ad97..165a88a59d04 100644 --- a/cmd/gc.go +++ b/cmd/gc.go @@ -70,6 +70,11 @@ $ juicefs gc redis://localhost --delete`, Value: 10, Usage: "number threads to delete leaked objects", }, + &cli.BoolFlag{ + Name: "scan-dangling", + Value: false, + Usage: "Scan dangling objects (nodes/edges...), better in read-only mode (time-intensive).", + }, }, } } @@ -196,7 +201,7 @@ func gc(ctx *cli.Context) error { // List all slices in metadata engine slices := make(map[meta.Ino][]meta.Slice) - r := m.ListSlices(c, slices, true, true, delete, sliceCSpin.Increment) + r := m.ListSlices(c, slices, true, ctx.Bool("scan-dangling"), delete, sliceCSpin.Increment) if r != 0 { logger.Fatalf("list all slices: %s", r) } diff --git a/pkg/meta/redis.go b/pkg/meta/redis.go index bec71ab086e7..c16fabac5d89 100644 --- a/pkg/meta/redis.go +++ b/pkg/meta/redis.go @@ -3058,9 +3058,9 @@ func (m *redisMeta) cleanupLeakedInodesAndEdges(clean bool, before time.Duration Name string } var ctx = Background() - var foundInodes = make(map[Ino]*entry) - foundInodes[RootInode] = &entry{Parent: RootInode, Name: "/"} - foundInodes[TrashInode] = &entry{Parent: RootInode, Name: ".trash"} + var node2Edges = make(map[Ino][]*entry) + node2Edges[RootInode] = []*entry{{Parent: RootInode, Name: "/"}} + node2Edges[TrashInode] = []*entry{{Parent: RootInode, Name: ".trash"}} cutoff := time.Now().Add(-before) prefix := len(m.prefix) @@ -3074,7 +3074,7 @@ func (m *redisMeta) cleanupLeakedInodesAndEdges(clean bool, before time.Duration return eno } for _, e := range entries { - foundInodes[e.Inode] = &entry{Parent: Ino(ino), Name: string(e.Name)} + node2Edges[e.Inode] = append(node2Edges[e.Inode], &entry{Parent: Ino(ino), Name: string(e.Name)}) } } return nil @@ -3096,7 +3096,7 @@ func (m *redisMeta) cleanupLeakedInodesAndEdges(clean bool, before time.Duration var attr Attr m.parseAttr([]byte(v.(string)), &attr) ino, _ := strconv.Atoi(keys[i][prefix+1:]) - if _, ok := foundInodes[Ino(ino)]; !ok && time.Unix(attr.Ctime, 0).Before(cutoff) { + if _, ok := node2Edges[Ino(ino)]; !ok && time.Unix(attr.Ctime, 0).Before(cutoff) { logger.Infof("found leaded inode: %d %+v", ino, attr) if clean { err = m.doDeleteSustainedInode(0, Ino(ino)) @@ -3105,7 +3105,7 @@ func (m *redisMeta) cleanupLeakedInodesAndEdges(clean bool, before time.Duration } } } - foundInodes[Ino(ino)] = nil + node2Edges[Ino(ino)] = nil } return nil }); err != nil { @@ -3113,19 +3113,25 @@ func (m *redisMeta) cleanupLeakedInodesAndEdges(clean bool, before time.Duration return } - foundInodes[RootInode], foundInodes[TrashInode] = nil, nil - pipe := m.rdb.Pipeline() - for c, e := range foundInodes { - if e != nil { - logger.Infof("found leaked edge %d -> (%d, %s)", e.Parent, c, e.Name) - if clean { - pipe.HDel(ctx, m.entryKey(e.Parent), e.Name) + node2Edges[RootInode], node2Edges[TrashInode] = nil, nil + for c, es := range node2Edges { + if err := m.txn(ctx, func(tx *redis.Tx) error { + // double check + if err := tx.Get(ctx, m.inodeKey(c)).Err(); err == nil { + return nil + } else if err != redis.Nil { + return err } - } - } - if pipe.Len() > 0 { - if _, err := pipe.Exec(ctx); err != nil { - logger.Errorf("delete leaked edges: %s", err) + for _, e := range es { + logger.Infof("found leaked edge %d -> (%d, %s)", e.Parent, c, e.Name) + if clean { + tx.HDel(ctx, m.entryKey(e.Parent), e.Name) + } + } + return nil + }, m.inodeKey(c)); err != nil { + logger.Errorf("delete leaked edges %d: %s", c, err) + return } } } diff --git a/pkg/meta/sql.go b/pkg/meta/sql.go index d3b0a4619203..3f4582cce6df 100644 --- a/pkg/meta/sql.go +++ b/pkg/meta/sql.go @@ -3154,9 +3154,9 @@ func (m *dbMeta) cleanupLeakedInodesAndEdges(clean bool, before time.Duration) { Parent Ino Name []byte } - var foundInodes = make(map[Ino]*entry) - foundInodes[RootInode] = &entry{Parent: RootInode, Name: []byte("/")} - foundInodes[TrashInode] = &entry{Parent: RootInode, Name: []byte(".trash")} + var node2Edges = make(map[Ino][]*entry) + node2Edges[RootInode] = []*entry{{Parent: RootInode, Name: []byte("/")}} + node2Edges[TrashInode] = []*entry{{Parent: RootInode, Name: []byte(".trash")}} cutoff := time.Now().Add(-before) var edges []edge @@ -3166,7 +3166,7 @@ func (m *dbMeta) cleanupLeakedInodesAndEdges(clean bool, before time.Duration) { return err } for _, edge := range edges { - foundInodes[edge.Inode] = &entry{Parent: edge.Parent, Name: edge.Name} + node2Edges[edge.Inode] = append(node2Edges[edge.Inode], &entry{Parent: edge.Parent, Name: edge.Name}) } if err := s.Find(&nodes); err != nil { return err @@ -3178,7 +3178,7 @@ func (m *dbMeta) cleanupLeakedInodesAndEdges(clean bool, before time.Duration) { } for _, node := range nodes { - if _, ok := foundInodes[node.Inode]; !ok && time.Unix(node.Ctime/1e6, 0).Before(cutoff) { + if _, ok := node2Edges[node.Inode]; !ok && time.Unix(node.Ctime/1e6, 0).Before(cutoff) { logger.Infof("found leaded inode: %d %+v", node.Inode, node) if clean { err := m.doDeleteSustainedInode(0, node.Inode) @@ -3187,13 +3187,19 @@ func (m *dbMeta) cleanupLeakedInodesAndEdges(clean bool, before time.Duration) { } } } - foundInodes[node.Inode] = nil + node2Edges[node.Inode] = nil } - foundInodes[RootInode], foundInodes[TrashInode] = nil, nil + node2Edges[RootInode], node2Edges[TrashInode] = nil, nil if err := m.txn(func(s *xorm.Session) error { - for c, e := range foundInodes { - if e != nil { + for c, es := range node2Edges { + n := node{Inode: c} + if exist, err := s.Get(&n); err != nil { + return err + } else if exist { + continue + } + for _, e := range es { logger.Infof("found leaked edge %d -> (%d, %s)", e.Parent, c, e.Name) if clean { if _, err := s.Delete(&edge{Parent: e.Parent, Name: e.Name}); err != nil { diff --git a/pkg/meta/tkv.go b/pkg/meta/tkv.go index 86a13801bb70..f7d21e6fc9e4 100644 --- a/pkg/meta/tkv.go +++ b/pkg/meta/tkv.go @@ -2444,9 +2444,9 @@ func (m *kvMeta) cleanupLeakedInodesAndEdges(clean bool, before time.Duration) { Parent Ino Name []byte } - var foundInodes = make(map[Ino]*entry) - foundInodes[RootInode] = &entry{Parent: RootInode, Name: []byte("/")} - foundInodes[TrashInode] = &entry{Parent: RootInode, Name: []byte(".trash")} + var node2Edges = make(map[Ino][]*entry) + node2Edges[RootInode] = []*entry{{Parent: RootInode, Name: []byte("/")}} + node2Edges[TrashInode] = []*entry{{Parent: RootInode, Name: []byte(".trash")}} cutoff := time.Now().Add(-before) nodes := make(map[Ino]*Attr) @@ -2461,7 +2461,7 @@ func (m *kvMeta) cleanupLeakedInodesAndEdges(clean bool, before time.Duration) { case 'D': name := string(key[10:]) _, inode := m.parseEntry(value) - foundInodes[inode] = &entry{Parent: ino, Name: []byte(name)} + node2Edges[inode] = append(node2Edges[inode], &entry{Parent: ino, Name: []byte(name)}) } } }); err != nil { @@ -2470,7 +2470,7 @@ func (m *kvMeta) cleanupLeakedInodesAndEdges(clean bool, before time.Duration) { } for inode, attr := range nodes { - if _, ok := foundInodes[inode]; !ok && time.Unix(attr.Ctime, 0).Before(cutoff) { + if _, ok := node2Edges[inode]; !ok && time.Unix(attr.Ctime, 0).Before(cutoff) { logger.Infof("found leaded inode: %d %+v", inode, attr) if clean { err := m.doDeleteSustainedInode(0, inode) @@ -2479,13 +2479,16 @@ func (m *kvMeta) cleanupLeakedInodesAndEdges(clean bool, before time.Duration) { } } } - foundInodes[inode] = nil + node2Edges[inode] = nil } - foundInodes[RootInode], foundInodes[TrashInode] = nil, nil + node2Edges[RootInode], node2Edges[TrashInode] = nil, nil if err := m.client.txn(Background(), func(tx *kvTxn) error { - for c, e := range foundInodes { - if e != nil { + for c, es := range node2Edges { + if tx.get(m.inodeKey(c)) != nil { + continue + } + for _, e := range es { logger.Infof("found leaked edge %d -> (%d, %s)", e.Parent, c, e.Name) if clean { tx.delete(m.entryKey(e.Parent, string(e.Name)))