Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/fsck.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func fsck(ctx *cli.Context) error {
// List all slices in metadata engine
sliceCSpin := progress.AddCountSpinner("Listed slices")
slices := make(map[meta.Ino][]meta.Slice)
r := m.ListSlices(c, slices, false, false, sliceCSpin.Increment)
r := m.ListSlices(c, slices, false, false, false, sliceCSpin.Increment)
if r != 0 {
logger.Fatalf("list all slices: %s", r)
}
Expand Down
7 changes: 6 additions & 1 deletion cmd/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ $ juicefs gc redis://localhost --delete`,
Value: 10,
Usage: "number threads to delete leaked objects",
},
&cli.BoolFlag{
Name: "scan-dangling",
Value: false,
Usage: "Scan dangling objects (nodes/edges...), better in read-only mode (time-intensive).",
},
},
}
}
Expand Down Expand Up @@ -196,7 +201,7 @@ func gc(ctx *cli.Context) error {

// List all slices in metadata engine
slices := make(map[meta.Ino][]meta.Slice)
r := m.ListSlices(c, slices, true, delete, sliceCSpin.Increment)
r := m.ListSlices(c, slices, true, ctx.Bool("scan-dangling"), delete, sliceCSpin.Increment)
if r != 0 {
logger.Fatalf("list all slices: %s", r)
}
Expand Down
93 changes: 90 additions & 3 deletions pkg/meta/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ func testMeta(t *testing.T, m Meta) {
}

testMetaClient(t, m)
testCleanLeakedObjects(t, m)
testTruncateAndDelete(t, m)
testTrash(t, m)
testParents(t, m)
Expand Down Expand Up @@ -1491,7 +1492,7 @@ func testCompaction(t *testing.T, m Meta, trash bool) {
}
p.Done()
sliceMap := make(map[Ino][]Slice)
if st := m.ListSlices(ctx, sliceMap, false, false, nil); st != 0 {
if st := m.ListSlices(ctx, sliceMap, false, false, false, nil); st != 0 {
t.Fatalf("list all slices: %s", st)
}

Expand Down Expand Up @@ -1700,7 +1701,7 @@ func testTruncateAndDelete(t *testing.T, m Meta) {
}
var total int64
slices := make(map[Ino][]Slice)
m.ListSlices(ctx, slices, false, false, func() { total++ })
m.ListSlices(ctx, slices, false, false, false, func() { total++ })
var totalSlices int
for _, ss := range slices {
totalSlices += len(ss)
Expand All @@ -1715,7 +1716,7 @@ func testTruncateAndDelete(t *testing.T, m Meta) {

time.Sleep(time.Millisecond * 100)
slices = make(map[Ino][]Slice)
m.ListSlices(ctx, slices, false, false, nil)
m.ListSlices(ctx, slices, false, false, false, nil)
totalSlices = 0
for _, ss := range slices {
totalSlices += len(ss)
Expand Down Expand Up @@ -3284,6 +3285,92 @@ func TestSymlinkCache(t *testing.T) {
require.Equal(t, int32(8000), cache.size.Load())
}

func testCleanLeakedObjects(t *testing.T, m Meta) {
if err := m.Init(testFormat(), false); err != nil {
t.Fatalf("init error: %s", err)
}

ctx := Background()
var parent Ino
var pattr Attr
pname := "d1"
if st := m.Mkdir(ctx, RootInode, pname, 0755, 0, 0, &parent, &pattr); st != 0 {
t.Fatalf("Mkdir %s: %s", pname, st)
}
defer m.Rmdir(ctx, RootInode, pname)

num := 5
cnames := make([]string, num)
cnodes := make([]Ino, num)
cattr := make([]Attr, num)
for i := 0; i < num; i++ {
cnames[i] = fmt.Sprintf("f%d", i+1)
if st := m.Mknod(ctx, parent, cnames[i], TypeFile, 0755, 0, 0, "", &cnodes[i], &cattr[i]); st != 0 {
t.Fatalf("Mknod %s: %s", cnames[i], st)
}
defer m.Unlink(ctx, parent, cnames[i])
}

time.Sleep(time.Second)
// delete cnodes[0]'s edge and cnodes[1]'s node
switch m.Name() {
case "redis":
meta := m.getBase().en.(*redisMeta)
require.Nil(t, meta.rdb.HDel(ctx, meta.entryKey(parent), cnames[0]).Err())
require.Nil(t, meta.rdb.Del(ctx, meta.inodeKey(cnodes[1])).Err())
meta.cleanupLeakedInodesAndEdges(true, 0)

require.Equal(t, redis.Nil, meta.rdb.Get(ctx, meta.inodeKey(cnodes[0])).Err())
require.Equal(t, redis.Nil, meta.rdb.HGet(ctx, meta.entryKey(parent), cnames[1]).Err())
case "badger":
meta := m.getBase().en.(*kvMeta)
meta.client.txn(Background(), func(tx *kvTxn) error {
tx.delete(meta.entryKey(parent, cnames[0]))
tx.delete(meta.inodeKey(cnodes[1]))
return nil
}, 0)
meta.cleanupLeakedInodesAndEdges(true, 0)
meta.client.txn(Background(), func(tx *kvTxn) error {
require.Nil(t, tx.get(meta.inodeKey(cnodes[0])))
require.Nil(t, tx.get(meta.entryKey(parent, cnames[1])))
return nil
}, 0)
case "sqlite3":
meta := m.getBase().en.(*dbMeta)
_, err := meta.db.Delete(&edge{Parent: parent, Name: []byte(cnames[0])})
require.Nil(t, err)
_, err = meta.db.Delete(&node{Inode: cnodes[1]})
require.Nil(t, err)
meta.cleanupLeakedInodesAndEdges(true, 0)
ok, err := meta.db.Exist(&node{Inode: cnodes[0]})
require.Nil(t, err)
require.False(t, ok)
ok, err = meta.db.Exist(&edge{Parent: parent, Name: []byte(cnames[1])})
require.Nil(t, err)
require.False(t, ok)
default:
return
}

var entries []*Entry
if st := m.Readdir(ctx, parent, 0, &entries); st != 0 {
t.Fatalf("Readdir %s: %s", pname, st)
}
require.Equal(t, num, len(entries))
expected := map[Ino]bool{
RootInode: true,
parent: true,
cnodes[2]: true,
cnodes[3]: true,
cnodes[4]: true,
}
for i := 0; i < num; i++ {
if _, ok := expected[entries[i].Inode]; !ok {
t.Fatalf("Unexpected entry %s", entries[i].Name)
}
}
}

func TestTxBatchLock(t *testing.T) {
var base baseMeta
// 0 inode
Expand Down
2 changes: 1 addition & 1 deletion pkg/meta/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ type Meta interface {
Compact(ctx Context, inode Ino, concurrency int, preFunc, postFunc func()) syscall.Errno

// ListSlices returns all slices used by all files.
ListSlices(ctx Context, slices map[Ino][]Slice, scanPending, delete bool, showProgress func()) syscall.Errno
ListSlices(ctx Context, slices map[Ino][]Slice, scanPending, scanLeaked, delete bool, showProgress func()) syscall.Errno
// Remove all files and directories recursively.
// count represents the number of attempted deletions of entries (even if failed).
Remove(ctx Context, parent Ino, name string, skipTrash bool, numThreads int, count *uint64) syscall.Errno
Expand Down
68 changes: 52 additions & 16 deletions pkg/meta/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -3052,15 +3052,19 @@ func (m *redisMeta) scanAllChunks(ctx Context, ch chan<- cchunk, bar *utils.Bar)
})
}

func (m *redisMeta) cleanupLeakedInodes(delete bool) {
func (m *redisMeta) cleanupLeakedInodesAndEdges(clean bool, before time.Duration) {
type entry struct {
Parent Ino
Name string
}
var ctx = Background()
var foundInodes = make(map[Ino]struct{})
foundInodes[RootInode] = struct{}{}
foundInodes[TrashInode] = struct{}{}
cutoff := time.Now().Add(time.Hour * -1)
var node2Edges = make(map[Ino][]*entry)
node2Edges[RootInode] = []*entry{{Parent: RootInode, Name: "/"}}
node2Edges[TrashInode] = []*entry{{Parent: RootInode, Name: ".trash"}}
cutoff := time.Now().Add(-before)
prefix := len(m.prefix)

_ = m.scan(ctx, "d[0-9]*", func(keys []string) error {
if err := m.scan(ctx, "d[0-9]*", func(keys []string) error {
for _, key := range keys {
ino, _ := strconv.Atoi(key[prefix+1:])
var entries []*Entry
Expand All @@ -3070,12 +3074,16 @@ func (m *redisMeta) cleanupLeakedInodes(delete bool) {
return eno
}
for _, e := range entries {
foundInodes[e.Inode] = struct{}{}
node2Edges[e.Inode] = append(node2Edges[e.Inode], &entry{Parent: Ino(ino), Name: string(e.Name)})
}
}
return nil
})
_ = m.scan(ctx, "i*", func(keys []string) error {
}); err != nil {
logger.Errorf("scan directories: %s", err)
return
}

if err := m.scan(ctx, "i*", func(keys []string) error {
values, err := m.rdb.MGet(ctx, keys...).Result()
if err != nil {
logger.Warnf("mget inodes: %s", err)
Expand All @@ -3088,18 +3096,44 @@ func (m *redisMeta) cleanupLeakedInodes(delete bool) {
var attr Attr
m.parseAttr([]byte(v.(string)), &attr)
ino, _ := strconv.Atoi(keys[i][prefix+1:])
if _, ok := foundInodes[Ino(ino)]; !ok && time.Unix(attr.Ctime, 0).Before(cutoff) {
logger.Infof("found dangling inode: %s %+v", keys[i], attr)
if delete {
if _, ok := node2Edges[Ino(ino)]; !ok && time.Unix(attr.Ctime, 0).Before(cutoff) {
logger.Infof("found leaded inode: %d %+v", ino, attr)
if clean {
err = m.doDeleteSustainedInode(0, Ino(ino))
if err != nil {
logger.Errorf("delete leaked inode %d : %s", ino, err)
}
}
}
node2Edges[Ino(ino)] = nil
}
return nil
})
}); err != nil {
logger.Errorf("scan inodes: %s", err)
return
}

node2Edges[RootInode], node2Edges[TrashInode] = nil, nil
for c, es := range node2Edges {
if err := m.txn(ctx, func(tx *redis.Tx) error {
// double check
if err := tx.Get(ctx, m.inodeKey(c)).Err(); err == nil {
return nil
} else if err != redis.Nil {
return err
}
for _, e := range es {
logger.Infof("found leaked edge %d -> (%d, %s)", e.Parent, c, e.Name)
if clean {
tx.HDel(ctx, m.entryKey(e.Parent), e.Name)
}
}
return nil
}, m.inodeKey(c)); err != nil {
logger.Errorf("delete leaked edges %d: %s", c, err)
return
}
}
}

func (m *redisMeta) scan(ctx context.Context, pattern string, f func([]string) error) error {
Expand Down Expand Up @@ -3155,9 +3189,11 @@ func (m *redisMeta) hscan(ctx context.Context, key string, f func([]string) erro
return nil
}

func (m *redisMeta) ListSlices(ctx Context, slices map[Ino][]Slice, scanPending, delete bool, showProgress func()) syscall.Errno {
m.cleanupLeakedInodes(delete)
m.cleanupLeakedChunks(delete)
func (m *redisMeta) ListSlices(ctx Context, slices map[Ino][]Slice, scanPending, scanLeaked, delete bool, showProgress func()) syscall.Errno {
if scanLeaked {
m.cleanupLeakedInodesAndEdges(delete, time.Hour)
m.cleanupLeakedChunks(delete)
}
m.cleanupOldSliceRefs(delete)
if delete {
m.doCleanupSlices()
Expand Down
71 changes: 70 additions & 1 deletion pkg/meta/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -3149,7 +3149,76 @@ func (m *dbMeta) scanAllChunks(ctx Context, ch chan<- cchunk, bar *utils.Bar) er
})
}

func (m *dbMeta) ListSlices(ctx Context, slices map[Ino][]Slice, scanPending, delete bool, showProgress func()) syscall.Errno {
func (m *dbMeta) cleanupLeakedInodesAndEdges(clean bool, before time.Duration) {
type entry struct {
Parent Ino
Name []byte
}
var node2Edges = make(map[Ino][]*entry)
node2Edges[RootInode] = []*entry{{Parent: RootInode, Name: []byte("/")}}
node2Edges[TrashInode] = []*entry{{Parent: RootInode, Name: []byte(".trash")}}
cutoff := time.Now().Add(-before)

var edges []edge
var nodes []node
if err := m.txn(func(s *xorm.Session) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

roTxn

if err := s.Find(&edges); err != nil {
return err
}
for _, edge := range edges {
node2Edges[edge.Inode] = append(node2Edges[edge.Inode], &entry{Parent: edge.Parent, Name: edge.Name})
}
if err := s.Find(&nodes); err != nil {
return err
}
return nil
}); err != nil {
logger.Errorf("scan all edges and nodes: %s", err)
return
}

for _, node := range nodes {
if _, ok := node2Edges[node.Inode]; !ok && time.Unix(node.Ctime/1e6, 0).Before(cutoff) {
logger.Infof("found leaded inode: %d %+v", node.Inode, node)
if clean {
err := m.doDeleteSustainedInode(0, node.Inode)
if err != nil {
logger.Errorf("delete leaked inode %d : %s", node.Inode, err)
}
}
}
node2Edges[node.Inode] = nil
}

node2Edges[RootInode], node2Edges[TrashInode] = nil, nil
if err := m.txn(func(s *xorm.Session) error {
for c, es := range node2Edges {
n := node{Inode: c}
if exist, err := s.Get(&n); err != nil {
return err
} else if exist {
continue
}
for _, e := range es {
logger.Infof("found leaked edge %d -> (%d, %s)", e.Parent, c, e.Name)
if clean {
if _, err := s.Delete(&edge{Parent: e.Parent, Name: e.Name}); err != nil {
return err
}
}
}
}
return nil
}); err != nil {
logger.Errorf("delete leaked edges: %s", err)
return
}
}

func (m *dbMeta) ListSlices(ctx Context, slices map[Ino][]Slice, scanPending, scanLeaked, delete bool, showProgress func()) syscall.Errno {
if scanLeaked {
m.cleanupLeakedInodesAndEdges(delete, time.Hour)
}
if delete {
m.doCleanupSlices()
}
Expand Down
Loading
Loading