diff --git a/pkg/meta/base.go b/pkg/meta/base.go index 2d81b62efd26..ece2abbdbf95 100644 --- a/pkg/meta/base.go +++ b/pkg/meta/base.go @@ -54,6 +54,7 @@ type engine interface { doFindDeletedFiles(ts int64, limit int) (map[Ino]uint64, error) // limit < 0 means all doDeleteFileData(inode Ino, length uint64) doCleanupSlices() + doCleanupDelayedSlices(edge int64, limit int) (int, error) doDeleteSlice(chunkid uint64, size uint32) error doGetAttr(ctx Context, inode Ino, attr *Attr) syscall.Errno @@ -484,6 +485,22 @@ func (m *baseMeta) marshal(attr *Attr) []byte { return w.Bytes() } +func (m *baseMeta) encodeDelayedSlice(chunkid uint64, size uint32) []byte { + w := utils.NewBuffer(8 + 4) + w.Put64(chunkid) + w.Put32(size) + return w.Bytes() +} + +func (m *baseMeta) decodeDelayedSlices(buf []byte, ss *[]Slice) { + if len(buf) == 0 || len(buf)%12 != 0 { + return + } + for rb := utils.FromBuffer(buf); rb.HasMore(); { + *ss = append(*ss, Slice{Chunkid: rb.Get64(), Size: rb.Get32()}) + } +} + func clearSUGID(ctx Context, cur *Attr, set *Attr) { switch runtime.GOOS { case "darwin": @@ -888,6 +905,7 @@ func (m *baseMeta) cleanupTrash() { logger.Warnf("checking counter lastCleanupTrash: %s", err) } else if ok { go m.doCleanupTrash(false) + go m.cleanupDelayedSlices() } } } @@ -907,6 +925,8 @@ func (m *baseMeta) doCleanupTrash(force bool) { defer func() { if count > 0 { logger.Infof("cleanup trash: deleted %d files in %v", count, time.Since(now)) + } else { + logger.Debugf("cleanup trash: nothing to delete") } }() @@ -951,3 +971,19 @@ func (m *baseMeta) doCleanupTrash(force bool) { } } } + +func (m *baseMeta) cleanupDelayedSlices() { + now := time.Now() + edge := now.Unix() - int64(m.fmt.TrashDays)*24*3600 + logger.Debugf("Cleanup delayed slices: started with edge %d", edge) + if count, err := m.en.doCleanupDelayedSlices(edge, 3e5); err == nil { + msg := fmt.Sprintf("Cleanup delayed slices: deleted %d slices in %v", count, time.Since(now)) + if count >= 3e5 { + logger.Warnf("%s (reached max limit, stop)", msg) + } else { + logger.Debugf(msg) + } + } else { + logger.Warnf("Cleanup delayed slices: deleted %d slices in %v, but got error: %s", count, time.Since(now), err) + } +} diff --git a/pkg/meta/redis.go b/pkg/meta/redis.go index 11813c65fe1f..ff3bcbb19c55 100644 --- a/pkg/meta/redis.go +++ b/pkg/meta/redis.go @@ -501,6 +501,10 @@ func (r *redisMeta) delfiles() string { return r.prefix + "delfiles" } +func (r *redisMeta) delSlices() string { + return r.prefix + "delSlices" +} + func (r *redisMeta) allSessions() string { return r.prefix + "allSessions" } @@ -2244,6 +2248,70 @@ func (r *redisMeta) doDeleteFileData_(inode Ino, length uint64, tracking string) _ = r.rdb.ZRem(ctx, r.delfiles(), tracking) } +func (r *redisMeta) doCleanupDelayedSlices(edge int64, limit int) (int, error) { + ctx := Background + stop := fmt.Errorf("reach limit") + var count int + var ss []Slice + var rs []*redis.IntCmd + err := r.hscan(ctx, r.delSlices(), func(keys []string) error { + for i := 0; i < len(keys); i += 2 { + key := keys[i] + ps := strings.Split(key, "_") + if len(ps) != 2 { + logger.Warnf("Invalid key %s", key) + continue + } + if ts, e := strconv.ParseInt(ps[1], 10, 64); e != nil { + logger.Warnf("Invalid key %s", key) + continue + } else if ts >= edge { + continue + } + + if err := r.txn(ctx, func(tx *redis.Tx) error { + val, e := tx.HGet(ctx, r.delSlices(), key).Result() + if e == redis.Nil { + return nil + } else if e != nil { + return e + } + ss, rs = ss[:0], rs[:0] + buf := []byte(val) + r.decodeDelayedSlices(buf, &ss) + if len(ss) == 0 { + return fmt.Errorf("invalid value for delSlices %s: %v", key, buf) + } + _, e = tx.Pipelined(ctx, func(pipe redis.Pipeliner) error { + for _, s := range ss { + rs = append(rs, pipe.HIncrBy(ctx, r.sliceRefs(), r.sliceKey(s.Chunkid, s.Size), -1)) + } + pipe.HDel(ctx, r.delSlices(), key) + return nil + }) + return e + }, r.delSlices()); err != nil { + logger.Warnf("Cleanup delSlices %s: %s", key, err) + continue + } + for i, s := range ss { + if rs[i].Err() == nil && rs[i].Val() < 0 { + r.deleteSlice(s.Chunkid, s.Size) + count++ + } + } + if count >= limit { + return stop + } + } + return nil + }) + if err == stop { + err = nil + } + return count, err +} + func (r *redisMeta) compactChunk(inode Ino, indx uint32, force bool) { // avoid too many or duplicated compaction if !force { @@ -2289,7 +2357,14 @@ func (r *redisMeta) compactChunk(inode Ino, indx uint32, force bool) { } return } - var rs []*redis.IntCmd + var buf []byte // trash enabled: track delayed slices + var rs []*redis.IntCmd // trash disabled: check reference of slices + trash := r.toTrash(0) + if trash { + for _, s := range ss { + buf = append(buf, r.encodeDelayedSlice(s.chunkid, s.size)...) + } + } key := r.chunkKey(inode, indx) errno := errno(r.txn(ctx, func(tx *redis.Tx) error { rs = nil @@ -2313,8 +2388,12 @@ func (r *redisMeta) compactChunk(inode Ino, indx uint32, force bool) { pipe.LPush(ctx, key, vals[i-1]) } pipe.HSet(ctx, r.sliceRefs(), r.sliceKey(chunkid, size), "0") // create the key to tracking it - for _, s := range ss { - rs = append(rs, pipe.HIncrBy(ctx, r.sliceRefs(), r.sliceKey(s.chunkid, s.size), -1)) + if trash { + pipe.HSet(ctx, r.delSlices(), fmt.Sprintf("%d_%d", chunkid, time.Now().Unix()), buf) + } else { + for _, s := range ss { + rs = append(rs, pipe.HIncrBy(ctx, r.sliceRefs(), r.sliceKey(s.chunkid, s.size), -1)) + } } return nil }) @@ -2336,9 +2415,11 @@ func (r *redisMeta) compactChunk(inode Ino, indx uint32, force bool) { } else if errno == 0 { r.of.InvalidateChunk(inode, indx) r.cleanupZeroRef(r.sliceKey(chunkid, size)) - for i, s := range ss { - if rs[i].Err() == nil && rs[i].Val() < 0 { - r.deleteSlice(s.chunkid, s.size) + if !trash { + for i, s := range ss { + if rs[i].Err() == nil && rs[i].Val() < 0 { + r.deleteSlice(s.chunkid, s.size) + } } } } else { @@ -2495,14 +2576,14 @@ func (r *redisMeta) ListSlices(ctx Context, slices map[Ino][]Slice, delete bool, } p := r.rdb.Pipeline() - return errno(r.scan(ctx, "c*_*", func(keys []string) error { + err := r.scan(ctx, "c*_*", func(keys []string) error { for _, key := range keys { _ = p.LRange(ctx, key, 0, 100000000) } cmds, err := p.Exec(ctx) if err != nil { logger.Warnf("list slices: %s", err) - return errno(err) + return err } for _, cmd := range cmds { key := cmd.(*redis.StringSliceCmd).Args()[1].(string) @@ -2519,7 +2600,30 @@ func (r *redisMeta) ListSlices(ctx Context, slices map[Ino][]Slice, delete bool, } } return nil - })) + }) + if err != nil { + return errno(err) + } + + var ss []Slice + err = r.hscan(ctx, r.delSlices(), func(keys []string) error { + for i := 0; i < len(keys); i += 2 { + ss = ss[:0] + r.decodeDelayedSlices([]byte(keys[i+1]), &ss) + if showProgress != nil { + for range ss { + showProgress() + } + } + for _, s := range ss { + if s.Chunkid > 0 { + slices[1] = append(slices[1], s) + } + } + } + return nil + }) + return errno(err) } func (r *redisMeta) GetXattr(ctx Context, inode Ino, name string, vbuff *[]byte) syscall.Errno { diff --git a/pkg/meta/redis_test.go b/pkg/meta/redis_test.go index 55311defe9f0..4c567801cbc6 100644 --- a/pkg/meta/redis_test.go +++ b/pkg/meta/redis_test.go @@ -69,7 +69,9 @@ func testMeta(t *testing.T, m Meta) { testStickyBit(t, m) testLocks(t, m) testConcurrentWrite(t, m) - testCompaction(t, m) + testCompaction(t, m, false) + time.Sleep(time.Second) + testCompaction(t, m, true) testCopyFileRange(t, m) testCloseSession(t, m) base.conf.CaseInsensi = true @@ -809,8 +811,12 @@ type compactor interface { compactChunk(inode Ino, indx uint32, force bool) } -func testCompaction(t *testing.T, m Meta) { - _ = m.Init(Format{Name: "test"}, false) +func testCompaction(t *testing.T, m Meta, trash bool) { + if trash { + _ = m.Init(Format{Name: "test", TrashDays: 1}, false) + } else { + _ = m.Init(Format{Name: "test"}, false) + } var l sync.Mutex deleted := make(map[uint64]int) m.OnMsg(DeleteChunk, func(args ...interface{}) error { @@ -898,8 +904,20 @@ func testCompaction(t *testing.T, m Meta) { l.Lock() deletes := len(deleted) l.Unlock() - if deletes < 30 { - t.Fatalf("deleted chunks %d is less then 30", deletes) + if trash { + if deletes > 10 { + t.Fatalf("deleted chunks %d is greater than 10", deletes) + } + if len(slices[1]) < 200 { + t.Fatalf("list delayed slices %d is less than 200", len(slices[1])) + } + m.(engine).doCleanupDelayedSlices(time.Now().Unix()+1, 1000) + l.Lock() + deletes = len(deleted) + l.Unlock() + } + if deletes < 200 { + t.Fatalf("deleted chunks %d is less than 200", deletes) } } diff --git a/pkg/meta/sql.go b/pkg/meta/sql.go index 968b8b9cf097..83758eee7240 100644 --- a/pkg/meta/sql.go +++ b/pkg/meta/sql.go @@ -85,11 +85,19 @@ type chunk struct { Indx uint32 `xorm:"unique(chunk) notnull"` Slices []byte `xorm:"blob notnull"` } + type chunkRef struct { Chunkid uint64 `xorm:"pk"` Size uint32 `xorm:"notnull"` Refs int `xorm:"notnull"` } + +type delslices struct { + Chunkid uint64 `xorm:"pk"` + Deleted int64 `xorm:"notnull"` // timestamp + Slices []byte `xorm:"blob notnull"` +} + type symlink struct { Inode Ino `xorm:"pk"` Target []byte `xorm:"varbinary(4096) notnull"` @@ -218,8 +226,8 @@ func (m *dbMeta) Init(format Format, force bool) error { if err := m.db.Sync2(new(node), new(symlink), new(xattr)); err != nil { logger.Fatalf("create table node, symlink, xattr: %s", err) } - if err := m.db.Sync2(new(chunk), new(chunkRef)); err != nil { - logger.Fatalf("create table chunk, chunk_ref: %s", err) + if err := m.db.Sync2(new(chunk), new(chunkRef), new(delslices)); err != nil { + logger.Fatalf("create table chunk, chunk_ref, delslices: %s", err) } if err := m.db.Sync2(new(session2), new(sustained), new(delfile)); err != nil { logger.Fatalf("create table session2, sustaind, delfile: %s", err) @@ -297,7 +305,7 @@ func (m *dbMeta) Init(format Format, force bool) error { func (m *dbMeta) Reset() error { return m.db.DropTables(&setting{}, &counter{}, &node{}, &edge{}, &symlink{}, &xattr{}, - &chunk{}, &chunkRef{}, + &chunk{}, &chunkRef{}, &delslices{}, &session{}, &session2{}, &sustained{}, &delfile{}, &flock{}, &plock{}) } @@ -314,9 +322,9 @@ func (m *dbMeta) doLoad() ([]byte, error) { } func (m *dbMeta) doNewSession(sinfo []byte) error { - err := m.db.Sync2(new(session2)) + err := m.db.Sync2(new(session2), new(delslices)) if err != nil { - return fmt.Errorf("update table session2: %s", err) + return fmt.Errorf("update table session2, delslices: %s", err) } // update the owner from uint64 to int64 if err = m.db.Sync2(new(flock), new(plock)); err != nil { @@ -1991,6 +1999,62 @@ func (m *dbMeta) doDeleteFileData(inode Ino, length uint64) { _, _ = m.db.Delete(delfile{Inode: inode}) } +func (m *dbMeta) doCleanupDelayedSlices(edge int64, limit int) (int, error) { + var ds delslices + rows, err := m.db.Where("deleted < ?", edge).Limit(limit, 0).Rows(&ds) + if err != nil { + return 0, err + } + var result []delslices + for rows.Next() { + ds.Slices = ds.Slices[:0] + if rows.Scan(&ds) == nil { + result = append(result, ds) + } + } + _ = rows.Close() + + var count int + var ss []Slice + for _, ds := range result { + if err = m.txn(func(ses *xorm.Session) error { + ds := delslices{Chunkid: ds.Chunkid} + if ok, e := ses.Get(&ds); e != nil { + return e + } else if !ok { + return nil + } + ss = ss[:0] + m.decodeDelayedSlices(ds.Slices, &ss) + if len(ss) == 0 { + return fmt.Errorf("invalid value for delayed slices %d: %v", ds.Chunkid, ds.Slices) + } + for _, s := range ss { + if _, e := ses.Exec("update jfs_chunk_ref set refs=refs-1 where chunkid=? and size=?", s.Chunkid, s.Size); e != nil { + return e + } + } + _, e := ses.Delete(&delslices{Chunkid: ds.Chunkid}) + return e + }); err != nil { + logger.Warnf("Cleanup delayed slices %d: %s", ds.Chunkid, err) + continue + } + for _, s := range ss { + var ref = chunkRef{Chunkid: s.Chunkid} + ok, e := m.db.Get(&ref) + if e == nil && ok && ref.Refs <= 0 { + m.deleteSlice(s.Chunkid, s.Size) + count++ + } + } + if count >= limit { + break + } + } + return count, nil +} + func (m *dbMeta) compactChunk(inode Ino, indx uint32, force bool) { if !force { // avoid too many or duplicated compaction @@ -2036,6 +2100,13 @@ func (m *dbMeta) compactChunk(inode Ino, indx uint32, force bool) { } return } + var buf []byte + trash := m.toTrash(0) + if trash { + for _, s := range ss { + buf = append(buf, m.encodeDelayedSlice(s.chunkid, s.size)...) + } + } err = m.txn(func(ses *xorm.Session) error { var c2 = chunk{Inode: inode} _, err := ses.Where("indx=?", indx).Get(&c2) @@ -2055,10 +2126,16 @@ func (m *dbMeta) compactChunk(inode Ino, indx uint32, force bool) { if err = mustInsert(ses, chunkRef{chunkid, size, 1}); err != nil { return err } - for _, s := range ss { - if _, err := ses.Exec("update jfs_chunk_ref set refs=refs-1 where chunkid=? and size=?", s.chunkid, s.size); err != nil { + if trash { + if err = mustInsert(ses, &delslices{chunkid, time.Now().Unix(), buf}); err != nil { return err } + } else { + for _, s := range ss { + if _, err := ses.Exec("update jfs_chunk_ref set refs=refs-1 where chunkid=? and size=?", s.chunkid, s.size); err != nil { + return err + } + } } return nil }) @@ -2081,11 +2158,13 @@ func (m *dbMeta) compactChunk(inode Ino, indx uint32, force bool) { m.deleteSlice(chunkid, size) } else if err == nil { m.of.InvalidateChunk(inode, indx) - for _, s := range ss { - var ref = chunkRef{Chunkid: s.chunkid} - ok, err := m.db.Get(&ref) - if err == nil && ok && ref.Refs <= 0 { - m.deleteSlice(s.chunkid, s.size) + if !trash { + for _, s := range ss { + var ref = chunkRef{Chunkid: s.chunkid} + ok, err := m.db.Get(&ref) + if err == nil && ok && ref.Refs <= 0 { + m.deleteSlice(s.chunkid, s.size) + } } } } else { @@ -2143,7 +2222,6 @@ func (m *dbMeta) ListSlices(ctx Context, slices map[Ino][]Slice, delete bool, sh return errno(err) } defer rows.Close() - for rows.Next() { err = rows.Scan(&c) if err != nil { @@ -2159,6 +2237,36 @@ func (m *dbMeta) ListSlices(ctx Context, slices map[Ino][]Slice, delete bool, sh } } } + + if ok, err := m.db.IsTableExist(&delslices{}); err != nil { + return errno(err) + } else if !ok { + return 0 + } + var ds delslices + rows2, err := m.db.Rows(&ds) + if err != nil { + return errno(err) + } + var ss []Slice + for rows2.Next() { + ds.Slices = ds.Slices[:0] + if rows2.Scan(&ds) == nil { + ss = ss[:0] + m.decodeDelayedSlices(ds.Slices, &ss) + if showProgress != nil { + for range ss { + showProgress() + } + } + for _, s := range ss { + if s.Chunkid > 0 { + slices[1] = append(slices[1], s) + } + } + } + } + _ = rows2.Close() return 0 } @@ -2702,8 +2810,8 @@ func (m *dbMeta) LoadMeta(r io.Reader) error { if err = m.db.Sync2(new(node), new(edge), new(symlink), new(xattr)); err != nil { return fmt.Errorf("create table node, edge, symlink, xattr: %s", err) } - if err = m.db.Sync2(new(chunk), new(chunkRef)); err != nil { - return fmt.Errorf("create table chunk, chunk_ref: %s", err) + if err = m.db.Sync2(new(chunk), new(chunkRef), new(delslices)); err != nil { + return fmt.Errorf("create table chunk, chunk_ref, delslices: %s", err) } if err = m.db.Sync2(new(session2), new(sustained), new(delfile)); err != nil { return fmt.Errorf("create table session2, sustaind, delfile: %s", err) diff --git a/pkg/meta/tkv.go b/pkg/meta/tkv.go index 31f9b890903c..6d9d88d948d9 100644 --- a/pkg/meta/tkv.go +++ b/pkg/meta/tkv.go @@ -167,6 +167,7 @@ All keys: Fiiiiiiii Flocks Piiiiiiii POSIX locks Kccccccccnnnn slice refs + Lcccccccctttttttt delayed slices SEssssssss session expire time SIssssssss session info SSssssssssiiiiiiii sustained inode @@ -188,6 +189,10 @@ func (m *kvMeta) sliceKey(chunkid uint64, size uint32) []byte { return m.fmtKey("K", chunkid, size) } +func (m *kvMeta) delSliceKey(ts int64, chunkid uint64) []byte { + return m.fmtKey("L", uint64(ts), chunkid) +} + func (m *kvMeta) symKey(inode Ino) []byte { return m.fmtKey("A", inode, "S") } @@ -1771,6 +1776,58 @@ func (m *kvMeta) doDeleteFileData(inode Ino, length uint64) { _ = m.deleteKeys(m.delfileKey(inode, length)) } +func (m *kvMeta) doCleanupDelayedSlices(edge int64, limit int) (int, error) { + // delayed slices: Lcccccccctttttttt + keys, err := m.scanKeys(m.fmtKey("L")) + if err != nil { + logger.Warnf("Scan delayed slices: %s", err) + return 0, err + } + + klen := 1 + 8 + 8 + var count int + var ss []Slice + var rs []int64 + for _, key := range keys { + if len(key) != klen { + continue + } + if m.parseInt64(key[9:]) >= edge { + continue + } + + if err = m.txn(func(tx kvTxn) error { + buf := tx.get(key) + if len(buf) == 0 { + return nil + } + ss, rs = ss[:0], rs[:0] + m.decodeDelayedSlices(buf, &ss) + if len(ss) == 0 { + return fmt.Errorf("invalid value for delayed slices %s: %v", key, buf) + } + for _, s := range ss { + rs = append(rs, tx.incrBy(m.sliceKey(s.Chunkid, s.Size), -1)) + } + tx.dels(key) + return nil + }); err != nil { + logger.Warnf("Cleanup delayed slices %s: %s", key, err) + continue + } + for i, s := range ss { + if rs[i] < 0 { + m.deleteSlice(s.Chunkid, s.Size) + count++ + } + } + if count >= limit { + break + } + } + return count, nil +} + func (m *kvMeta) compactChunk(inode Ino, indx uint32, force bool) { if !force { // avoid too many or duplicated compaction @@ -1818,6 +1875,13 @@ func (m *kvMeta) compactChunk(inode Ino, indx uint32, force bool) { } return } + var dsbuf []byte + trash := m.toTrash(0) + if trash { + for _, s := range ss { + dsbuf = append(dsbuf, m.encodeDelayedSlice(s.chunkid, s.size)...) + } + } err = m.txn(func(tx kvTxn) error { buf2 := tx.get(m.chunkKey(inode, indx)) if len(buf2) < len(buf) || !bytes.Equal(buf, buf2[:len(buf)]) { @@ -1829,8 +1893,12 @@ func (m *kvMeta) compactChunk(inode Ino, indx uint32, force bool) { tx.set(m.chunkKey(inode, indx), buf2) // create the key to tracking it tx.set(m.sliceKey(chunkid, size), make([]byte, 8)) - for _, s := range ss { - tx.incrBy(m.sliceKey(s.chunkid, s.size), -1) + if trash { + tx.set(m.delSliceKey(time.Now().Unix(), chunkid), dsbuf) + } else { + for _, s := range ss { + tx.incrBy(m.sliceKey(s.chunkid, s.size), -1) + } } return nil }) @@ -1854,13 +1922,15 @@ func (m *kvMeta) compactChunk(inode Ino, indx uint32, force bool) { } else if err == nil { m.of.InvalidateChunk(inode, indx) m.cleanupZeroRef(chunkid, size) - var refs int64 - for _, s := range ss { - if m.client.txn(func(tx kvTxn) error { - refs = tx.incrBy(m.sliceKey(s.chunkid, s.size), 0) - return nil - }) == nil && refs < 0 { - m.deleteSlice(s.chunkid, s.size) + if !trash { + var refs int64 + for _, s := range ss { + if m.client.txn(func(tx kvTxn) error { + refs = tx.incrBy(m.sliceKey(s.chunkid, s.size), 0) + return nil + }) == nil && refs < 0 { + m.deleteSlice(s.chunkid, s.size) + } } } } else { @@ -1926,6 +1996,31 @@ func (m *kvMeta) ListSlices(ctx Context, slices map[Ino][]Slice, delete bool, sh } } } + + // delayed slices: Lcccccccctttttttt + klen = 1 + 8 + 8 + result, err = m.scanValues(m.fmtKey("L"), -1, func(k, v []byte) bool { + return len(k) == klen + }) + if err != nil { + logger.Warnf("Scan delayed slices: %s", err) + return errno(err) + } + var ss []Slice + for _, value := range result { + ss = ss[:0] + m.decodeDelayedSlices(value, &ss) + if showProgress != nil { + for range ss { + showProgress() + } + } + for _, s := range ss { + if s.Chunkid > 0 { + slices[1] = append(slices[1], s) + } + } + } return 0 }