Skip to content
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/fsck.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func fsck(ctx *cli.Context) error {
// List all slices in metadata engine
sliceCSpin := progress.AddCountSpinner("Listed slices")
slices := make(map[meta.Ino][]meta.Slice)
r := m.ListSlices(c, slices, false, false, sliceCSpin.Increment)
r := m.ListSlices(c, slices, false, false, false, sliceCSpin.Increment)
if r != 0 {
logger.Fatalf("list all slices: %s", r)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func gc(ctx *cli.Context) error {

// List all slices in metadata engine
slices := make(map[meta.Ino][]meta.Slice)
r := m.ListSlices(c, slices, true, delete, sliceCSpin.Increment)
r := m.ListSlices(c, slices, true, true, delete, sliceCSpin.Increment)
if r != 0 {
logger.Fatalf("list all slices: %s", r)
}
Expand Down
93 changes: 90 additions & 3 deletions pkg/meta/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ func testMeta(t *testing.T, m Meta) {
}

testMetaClient(t, m)
testCleanLeakedObjects(t, m)
testTruncateAndDelete(t, m)
testTrash(t, m)
testParents(t, m)
Expand Down Expand Up @@ -1491,7 +1492,7 @@ func testCompaction(t *testing.T, m Meta, trash bool) {
}
p.Done()
sliceMap := make(map[Ino][]Slice)
if st := m.ListSlices(ctx, sliceMap, false, false, nil); st != 0 {
if st := m.ListSlices(ctx, sliceMap, false, false, false, nil); st != 0 {
t.Fatalf("list all slices: %s", st)
}

Expand Down Expand Up @@ -1700,7 +1701,7 @@ func testTruncateAndDelete(t *testing.T, m Meta) {
}
var total int64
slices := make(map[Ino][]Slice)
m.ListSlices(ctx, slices, false, false, func() { total++ })
m.ListSlices(ctx, slices, false, false, false, func() { total++ })
var totalSlices int
for _, ss := range slices {
totalSlices += len(ss)
Expand All @@ -1715,7 +1716,7 @@ func testTruncateAndDelete(t *testing.T, m Meta) {

time.Sleep(time.Millisecond * 100)
slices = make(map[Ino][]Slice)
m.ListSlices(ctx, slices, false, false, nil)
m.ListSlices(ctx, slices, false, false, false, nil)
totalSlices = 0
for _, ss := range slices {
totalSlices += len(ss)
Expand Down Expand Up @@ -3284,6 +3285,92 @@ func TestSymlinkCache(t *testing.T) {
require.Equal(t, int32(8000), cache.size.Load())
}

func testCleanLeakedObjects(t *testing.T, m Meta) {
if err := m.Init(testFormat(), false); err != nil {
t.Fatalf("init error: %s", err)
}

ctx := Background()
var parent Ino
var pattr Attr
pname := "d1"
if st := m.Mkdir(ctx, RootInode, pname, 0755, 0, 0, &parent, &pattr); st != 0 {
t.Fatalf("Mkdir %s: %s", pname, st)
}
defer m.Rmdir(ctx, RootInode, pname)

num := 5
cnames := make([]string, num)
cnodes := make([]Ino, num)
cattr := make([]Attr, num)
for i := 0; i < num; i++ {
cnames[i] = fmt.Sprintf("f%d", i+1)
if st := m.Mknod(ctx, parent, cnames[i], TypeFile, 0755, 0, 0, "", &cnodes[i], &cattr[i]); st != 0 {
t.Fatalf("Mknod %s: %s", cnames[i], st)
}
defer m.Unlink(ctx, parent, cnames[i])
}

time.Sleep(time.Second)
// delete cnodes[0]'s edge and cnodes[1]'s node
switch m.Name() {
case "redis":
meta := m.getBase().en.(*redisMeta)
require.Nil(t, meta.rdb.HDel(ctx, meta.entryKey(parent), cnames[0]).Err())
require.Nil(t, meta.rdb.Del(ctx, meta.inodeKey(cnodes[1])).Err())
meta.cleanupLeakedInodesAndEdges(true, 0)

require.Equal(t, redis.Nil, meta.rdb.Get(ctx, meta.inodeKey(cnodes[0])).Err())
require.Equal(t, redis.Nil, meta.rdb.HGet(ctx, meta.entryKey(parent), cnames[1]).Err())
case "badger":
meta := m.getBase().en.(*kvMeta)
meta.client.txn(Background(), func(tx *kvTxn) error {
tx.delete(meta.entryKey(parent, cnames[0]))
tx.delete(meta.inodeKey(cnodes[1]))
return nil
}, 0)
meta.cleanupLeakedInodesAndEdges(true, 0)
meta.client.txn(Background(), func(tx *kvTxn) error {
require.Nil(t, tx.get(meta.inodeKey(cnodes[0])))
require.Nil(t, tx.get(meta.entryKey(parent, cnames[1])))
return nil
}, 0)
case "sqlite3":
meta := m.getBase().en.(*dbMeta)
_, err := meta.db.Delete(&edge{Parent: parent, Name: []byte(cnames[0])})
require.Nil(t, err)
_, err = meta.db.Delete(&node{Inode: cnodes[1]})
require.Nil(t, err)
meta.cleanupLeakedInodesAndEdges(true, 0)
ok, err := meta.db.Exist(&node{Inode: cnodes[0]})
require.Nil(t, err)
require.False(t, ok)
ok, err = meta.db.Exist(&edge{Parent: parent, Name: []byte(cnames[1])})
require.Nil(t, err)
require.False(t, ok)
default:
return
}

var entries []*Entry
if st := m.Readdir(ctx, parent, 0, &entries); st != 0 {
t.Fatalf("Readdir %s: %s", pname, st)
}
require.Equal(t, num, len(entries))
expected := map[Ino]bool{
RootInode: true,
parent: true,
cnodes[2]: true,
cnodes[3]: true,
cnodes[4]: true,
}
for i := 0; i < num; i++ {
if _, ok := expected[entries[i].Inode]; !ok {
t.Fatalf("Unexpected entry %s", entries[i].Name)
}
}
}

func TestTxBatchLock(t *testing.T) {
var base baseMeta
// 0 inode
Expand Down
2 changes: 1 addition & 1 deletion pkg/meta/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ type Meta interface {
Compact(ctx Context, inode Ino, concurrency int, preFunc, postFunc func()) syscall.Errno

// ListSlices returns all slices used by all files.
ListSlices(ctx Context, slices map[Ino][]Slice, scanPending, delete bool, showProgress func()) syscall.Errno
ListSlices(ctx Context, slices map[Ino][]Slice, scanPending, scanLeaked, delete bool, showProgress func()) syscall.Errno
// Remove all files and directories recursively.
// count represents the number of attempted deletions of entries (even if failed).
Remove(ctx Context, parent Ino, name string, skipTrash bool, numThreads int, count *uint64) syscall.Errno
Expand Down
60 changes: 45 additions & 15 deletions pkg/meta/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -3052,15 +3052,19 @@ func (m *redisMeta) scanAllChunks(ctx Context, ch chan<- cchunk, bar *utils.Bar)
})
}

func (m *redisMeta) cleanupLeakedInodes(delete bool) {
func (m *redisMeta) cleanupLeakedInodesAndEdges(clean bool, before time.Duration) {
type entry struct {
Parent Ino
Name string
}
var ctx = Background()
var foundInodes = make(map[Ino]struct{})
foundInodes[RootInode] = struct{}{}
foundInodes[TrashInode] = struct{}{}
cutoff := time.Now().Add(time.Hour * -1)
var foundInodes = make(map[Ino]*entry)
foundInodes[RootInode] = &entry{Parent: RootInode, Name: "/"}
foundInodes[TrashInode] = &entry{Parent: RootInode, Name: ".trash"}
cutoff := time.Now().Add(-before)
prefix := len(m.prefix)

_ = m.scan(ctx, "d[0-9]*", func(keys []string) error {
if err := m.scan(ctx, "d[0-9]*", func(keys []string) error {
for _, key := range keys {
ino, _ := strconv.Atoi(key[prefix+1:])
var entries []*Entry
Expand All @@ -3070,12 +3074,16 @@ func (m *redisMeta) cleanupLeakedInodes(delete bool) {
return eno
}
for _, e := range entries {
foundInodes[e.Inode] = struct{}{}
foundInodes[e.Inode] = &entry{Parent: Ino(ino), Name: string(e.Name)}
}
}
return nil
})
_ = m.scan(ctx, "i*", func(keys []string) error {
}); err != nil {
logger.Errorf("scan directories: %s", err)
return
}

if err := m.scan(ctx, "i*", func(keys []string) error {
values, err := m.rdb.MGet(ctx, keys...).Result()
if err != nil {
logger.Warnf("mget inodes: %s", err)
Expand All @@ -3089,17 +3097,37 @@ func (m *redisMeta) cleanupLeakedInodes(delete bool) {
m.parseAttr([]byte(v.(string)), &attr)
ino, _ := strconv.Atoi(keys[i][prefix+1:])
if _, ok := foundInodes[Ino(ino)]; !ok && time.Unix(attr.Ctime, 0).Before(cutoff) {
logger.Infof("found dangling inode: %s %+v", keys[i], attr)
if delete {
logger.Infof("found leaded inode: %d %+v", ino, attr)
if clean {
err = m.doDeleteSustainedInode(0, Ino(ino))
if err != nil {
logger.Errorf("delete leaked inode %d : %s", ino, err)
}
}
}
foundInodes[Ino(ino)] = nil
}
return nil
})
}); err != nil {
logger.Errorf("scan inodes: %s", err)
return
}

foundInodes[RootInode], foundInodes[TrashInode] = nil, nil
pipe := m.rdb.Pipeline()
for c, e := range foundInodes {
if e != nil {
logger.Infof("found leaked edge %d -> (%d, %s)", e.Parent, c, e.Name)
if clean {
pipe.HDel(ctx, m.entryKey(e.Parent), e.Name)
}
}
}
if pipe.Len() > 0 {
if _, err := pipe.Exec(ctx); err != nil {
logger.Errorf("delete leaked edges: %s", err)
}
}
}

func (m *redisMeta) scan(ctx context.Context, pattern string, f func([]string) error) error {
Expand Down Expand Up @@ -3155,9 +3183,11 @@ func (m *redisMeta) hscan(ctx context.Context, key string, f func([]string) erro
return nil
}

func (m *redisMeta) ListSlices(ctx Context, slices map[Ino][]Slice, scanPending, delete bool, showProgress func()) syscall.Errno {
m.cleanupLeakedInodes(delete)
m.cleanupLeakedChunks(delete)
func (m *redisMeta) ListSlices(ctx Context, slices map[Ino][]Slice, scanPending, scanLeaked, delete bool, showProgress func()) syscall.Errno {
if scanLeaked {
m.cleanupLeakedInodesAndEdges(delete, time.Hour)
m.cleanupLeakedChunks(delete)
}
m.cleanupOldSliceRefs(delete)
if delete {
m.doCleanupSlices()
Expand Down
65 changes: 64 additions & 1 deletion pkg/meta/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -3149,7 +3149,70 @@ func (m *dbMeta) scanAllChunks(ctx Context, ch chan<- cchunk, bar *utils.Bar) er
})
}

func (m *dbMeta) ListSlices(ctx Context, slices map[Ino][]Slice, scanPending, delete bool, showProgress func()) syscall.Errno {
func (m *dbMeta) cleanupLeakedInodesAndEdges(clean bool, before time.Duration) {
type entry struct {
Parent Ino
Name []byte
}
var foundInodes = make(map[Ino]*entry)
foundInodes[RootInode] = &entry{Parent: RootInode, Name: []byte("/")}
foundInodes[TrashInode] = &entry{Parent: RootInode, Name: []byte(".trash")}
cutoff := time.Now().Add(-before)

var edges []edge
var nodes []node
if err := m.txn(func(s *xorm.Session) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

roTxn

if err := s.Find(&edges); err != nil {
return err
}
for _, edge := range edges {
foundInodes[edge.Inode] = &entry{Parent: edge.Parent, Name: edge.Name}
}
if err := s.Find(&nodes); err != nil {
return err
}
return nil
}); err != nil {
logger.Errorf("scan all edges and nodes: %s", err)
return
}

for _, node := range nodes {
if _, ok := foundInodes[node.Inode]; !ok && time.Unix(node.Ctime/1e6, 0).Before(cutoff) {
logger.Infof("found leaded inode: %d %+v", node.Inode, node)
if clean {
err := m.doDeleteSustainedInode(0, node.Inode)
if err != nil {
logger.Errorf("delete leaked inode %d : %s", node.Inode, err)
}
}
}
foundInodes[node.Inode] = nil
}

foundInodes[RootInode], foundInodes[TrashInode] = nil, nil
if err := m.txn(func(s *xorm.Session) error {
for c, e := range foundInodes {
if e != nil {
logger.Infof("found leaked edge %d -> (%d, %s)", e.Parent, c, e.Name)
if clean {
if _, err := s.Delete(&edge{Parent: e.Parent, Name: e.Name}); err != nil {
return err
}
}
}
}
return nil
}); err != nil {
logger.Errorf("delete leaked edges: %s", err)
return
}
}

func (m *dbMeta) ListSlices(ctx Context, slices map[Ino][]Slice, scanPending, scanLeaked, delete bool, showProgress func()) syscall.Errno {
if scanLeaked {
m.cleanupLeakedInodesAndEdges(delete, time.Hour)
}
if delete {
m.doCleanupSlices()
}
Expand Down
Loading
Loading