Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions pkg/meta/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type engine interface {
doFindDeletedFiles(ts int64, limit int) (map[Ino]uint64, error) // limit < 0 means all
doDeleteFileData(inode Ino, length uint64)
doCleanupSlices()
doCleanupDelayedSlices(edge int64, limit int) (int, error)
doDeleteSlice(chunkid uint64, size uint32) error

doGetAttr(ctx Context, inode Ino, attr *Attr) syscall.Errno
Expand Down Expand Up @@ -484,6 +485,22 @@ func (m *baseMeta) marshal(attr *Attr) []byte {
return w.Bytes()
}

func (m *baseMeta) encodeDelayedSlice(chunkid uint64, size uint32) []byte {
w := utils.NewBuffer(8 + 4)
w.Put64(chunkid)
w.Put32(size)
return w.Bytes()
}

func (m *baseMeta) decodeDelayedSlices(buf []byte, ss *[]Slice) {
if len(buf) == 0 || len(buf)%12 != 0 {
return
}
for rb := utils.FromBuffer(buf); rb.HasMore(); {
*ss = append(*ss, Slice{Chunkid: rb.Get64(), Size: rb.Get32()})
}
}

func clearSUGID(ctx Context, cur *Attr, set *Attr) {
switch runtime.GOOS {
case "darwin":
Expand Down Expand Up @@ -888,6 +905,7 @@ func (m *baseMeta) cleanupTrash() {
logger.Warnf("checking counter lastCleanupTrash: %s", err)
} else if ok {
go m.doCleanupTrash(false)
go m.cleanupDelayedSlices()
}
}
}
Expand All @@ -907,6 +925,8 @@ func (m *baseMeta) doCleanupTrash(force bool) {
defer func() {
if count > 0 {
logger.Infof("cleanup trash: deleted %d files in %v", count, time.Since(now))
} else {
logger.Debugf("cleanup trash: nothing to delete")
}
}()

Expand Down Expand Up @@ -951,3 +971,19 @@ func (m *baseMeta) doCleanupTrash(force bool) {
}
}
}

func (m *baseMeta) cleanupDelayedSlices() {
now := time.Now()
edge := now.Unix() - int64(m.fmt.TrashDays)*24*3600
logger.Debugf("Cleanup delayed slices: started with edge %d", edge)
if count, err := m.en.doCleanupDelayedSlices(edge, 3e5); err == nil {
msg := fmt.Sprintf("Cleanup delayed slices: deleted %d slices in %v", count, time.Since(now))
if count >= 3e5 {
logger.Warnf("%s (reached max limit, stop)", msg)
} else {
logger.Debugf(msg)
}
} else {
logger.Warnf("Cleanup delayed slices: deleted %d slices in %v, but got error: %s", count, time.Since(now), err)
}
}
122 changes: 113 additions & 9 deletions pkg/meta/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,10 @@ func (r *redisMeta) delfiles() string {
return r.prefix + "delfiles"
}

func (r *redisMeta) delSlices() string {
return r.prefix + "delSlices"
}

func (r *redisMeta) allSessions() string {
return r.prefix + "allSessions"
}
Expand Down Expand Up @@ -2244,6 +2248,70 @@ func (r *redisMeta) doDeleteFileData_(inode Ino, length uint64, tracking string)
_ = r.rdb.ZRem(ctx, r.delfiles(), tracking)
}

func (r *redisMeta) doCleanupDelayedSlices(edge int64, limit int) (int, error) {
ctx := Background
stop := fmt.Errorf("reach limit")
var count int
var ss []Slice
var rs []*redis.IntCmd
err := r.hscan(ctx, r.delSlices(), func(keys []string) error {
for i := 0; i < len(keys); i += 2 {
key := keys[i]
ps := strings.Split(key, "_")
if len(ps) != 2 {
logger.Warnf("Invalid key %s", key)
continue
}
if ts, e := strconv.ParseInt(ps[1], 10, 64); e != nil {
logger.Warnf("Invalid key %s", key)
continue
} else if ts >= edge {
continue
}

if err := r.txn(ctx, func(tx *redis.Tx) error {
val, e := tx.HGet(ctx, r.delSlices(), key).Result()
if e == redis.Nil {
return nil
} else if e != nil {
return e
}
ss, rs = ss[:0], rs[:0]
buf := []byte(val)
r.decodeDelayedSlices(buf, &ss)
if len(ss) == 0 {
return fmt.Errorf("invalid value for delSlices %s: %v", key, buf)
}
_, e = tx.Pipelined(ctx, func(pipe redis.Pipeliner) error {
for _, s := range ss {
rs = append(rs, pipe.HIncrBy(ctx, r.sliceRefs(), r.sliceKey(s.Chunkid, s.Size), -1))
}
pipe.HDel(ctx, r.delSlices(), key)
return nil
})
return e
}, r.delSlices()); err != nil {
logger.Warnf("Cleanup delSlices %s: %s", key, err)
continue
}
for i, s := range ss {
if rs[i].Err() == nil && rs[i].Val() < 0 {
r.deleteSlice(s.Chunkid, s.Size)
count++
}
}
if count >= limit {
return stop
}
}
return nil
})
if err == stop {
err = nil
}
return count, err
}

func (r *redisMeta) compactChunk(inode Ino, indx uint32, force bool) {
// avoid too many or duplicated compaction
if !force {
Expand Down Expand Up @@ -2289,7 +2357,14 @@ func (r *redisMeta) compactChunk(inode Ino, indx uint32, force bool) {
}
return
}
var rs []*redis.IntCmd
var buf []byte // trash enabled: track delayed slices
var rs []*redis.IntCmd // trash disabled: check reference of slices
trash := r.toTrash(0)
if trash {
for _, s := range ss {
buf = append(buf, r.encodeDelayedSlice(s.chunkid, s.size)...)
}
}
key := r.chunkKey(inode, indx)
errno := errno(r.txn(ctx, func(tx *redis.Tx) error {
rs = nil
Expand All @@ -2313,8 +2388,12 @@ func (r *redisMeta) compactChunk(inode Ino, indx uint32, force bool) {
pipe.LPush(ctx, key, vals[i-1])
}
pipe.HSet(ctx, r.sliceRefs(), r.sliceKey(chunkid, size), "0") // create the key to tracking it
for _, s := range ss {
rs = append(rs, pipe.HIncrBy(ctx, r.sliceRefs(), r.sliceKey(s.chunkid, s.size), -1))
if trash {
pipe.HSet(ctx, r.delSlices(), fmt.Sprintf("%d_%d", chunkid, time.Now().Unix()), buf)
} else {
for _, s := range ss {
rs = append(rs, pipe.HIncrBy(ctx, r.sliceRefs(), r.sliceKey(s.chunkid, s.size), -1))
}
}
return nil
})
Expand All @@ -2336,9 +2415,11 @@ func (r *redisMeta) compactChunk(inode Ino, indx uint32, force bool) {
} else if errno == 0 {
r.of.InvalidateChunk(inode, indx)
r.cleanupZeroRef(r.sliceKey(chunkid, size))
for i, s := range ss {
if rs[i].Err() == nil && rs[i].Val() < 0 {
r.deleteSlice(s.chunkid, s.size)
if !trash {
for i, s := range ss {
if rs[i].Err() == nil && rs[i].Val() < 0 {
r.deleteSlice(s.chunkid, s.size)
}
}
}
} else {
Expand Down Expand Up @@ -2495,14 +2576,14 @@ func (r *redisMeta) ListSlices(ctx Context, slices map[Ino][]Slice, delete bool,
}

p := r.rdb.Pipeline()
return errno(r.scan(ctx, "c*_*", func(keys []string) error {
err := r.scan(ctx, "c*_*", func(keys []string) error {
for _, key := range keys {
_ = p.LRange(ctx, key, 0, 100000000)
}
cmds, err := p.Exec(ctx)
if err != nil {
logger.Warnf("list slices: %s", err)
return errno(err)
return err
}
for _, cmd := range cmds {
key := cmd.(*redis.StringSliceCmd).Args()[1].(string)
Expand All @@ -2519,7 +2600,30 @@ func (r *redisMeta) ListSlices(ctx Context, slices map[Ino][]Slice, delete bool,
}
}
return nil
}))
})
if err != nil {
return errno(err)
}

var ss []Slice
err = r.hscan(ctx, r.delSlices(), func(keys []string) error {
for i := 0; i < len(keys); i += 2 {
ss = ss[:0]
r.decodeDelayedSlices([]byte(keys[i+1]), &ss)
if showProgress != nil {
for range ss {
showProgress()
}
}
for _, s := range ss {
if s.Chunkid > 0 {
slices[1] = append(slices[1], s)
}
}
}
return nil
})
return errno(err)
}

func (r *redisMeta) GetXattr(ctx Context, inode Ino, name string, vbuff *[]byte) syscall.Errno {
Expand Down
28 changes: 23 additions & 5 deletions pkg/meta/redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ func testMeta(t *testing.T, m Meta) {
testStickyBit(t, m)
testLocks(t, m)
testConcurrentWrite(t, m)
testCompaction(t, m)
testCompaction(t, m, false)
time.Sleep(time.Second)
testCompaction(t, m, true)
testCopyFileRange(t, m)
testCloseSession(t, m)
base.conf.CaseInsensi = true
Expand Down Expand Up @@ -809,8 +811,12 @@ type compactor interface {
compactChunk(inode Ino, indx uint32, force bool)
}

func testCompaction(t *testing.T, m Meta) {
_ = m.Init(Format{Name: "test"}, false)
func testCompaction(t *testing.T, m Meta, trash bool) {
if trash {
_ = m.Init(Format{Name: "test", TrashDays: 1}, false)
} else {
_ = m.Init(Format{Name: "test"}, false)
}
var l sync.Mutex
deleted := make(map[uint64]int)
m.OnMsg(DeleteChunk, func(args ...interface{}) error {
Expand Down Expand Up @@ -898,8 +904,20 @@ func testCompaction(t *testing.T, m Meta) {
l.Lock()
deletes := len(deleted)
l.Unlock()
if deletes < 30 {
t.Fatalf("deleted chunks %d is less then 30", deletes)
if trash {
if deletes > 10 {
t.Fatalf("deleted chunks %d is greater than 10", deletes)
}
if len(slices[1]) < 200 {
t.Fatalf("list delayed slices %d is less than 200", len(slices[1]))
}
m.(engine).doCleanupDelayedSlices(time.Now().Unix()+1, 1000)
l.Lock()
deletes = len(deleted)
l.Unlock()
}
if deletes < 200 {
t.Fatalf("deleted chunks %d is less than 200", deletes)
}
}

Expand Down
Loading