Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions pkg/meta/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ type engine interface {
doDeleteSustainedInode(sid uint64, inode Ino) error
doFindDeletedFiles(ts int64, limit int) (map[Ino]uint64, error) // limit < 0 means all
doDeleteFileData(inode Ino, length uint64)
doCleanupSlices()
doCleanupSlices(ctx Context)
doCleanupDelayedSlices(ctx Context, edge int64) (int, error)
doDeleteSlice(id uint64, size uint32) error

Expand Down Expand Up @@ -769,7 +769,9 @@ func (m *baseMeta) cleanupSlices(ctx Context) {
if ok, err := m.en.setIfSmall("nextCleanupSlices", time.Now().Unix(), int64(time.Hour.Seconds())*9/10); err != nil {
logger.Warnf("checking counter nextCleanupSlices: %s", err)
} else if ok {
m.en.doCleanupSlices()
cCtx := WrapWithTimeout(ctx, time.Minute*50)
m.en.doCleanupSlices(cCtx)
cCtx.Cancel()
}
}
}
Expand Down Expand Up @@ -2551,6 +2553,7 @@ func (m *baseMeta) trashEntry(parent, inode Ino, name string) string {

func (m *baseMeta) cleanupTrash(ctx Context) {
defer m.sessWG.Done()
var cCtx Context
for {
select {
case <-ctx.Done():
Expand All @@ -2566,9 +2569,13 @@ func (m *baseMeta) cleanupTrash(ctx Context) {
if ok, err := m.en.setIfSmall("lastCleanupTrash", time.Now().Unix(), int64(time.Hour.Seconds())*9/10); err != nil {
logger.Warnf("checking counter lastCleanupTrash: %s", err)
} else if ok {
if cCtx != nil {
cCtx.Cancel()
cCtx = WrapWithTimeout(ctx, 50*time.Minute)
}
days := m.getFormat().TrashDays
go m.doCleanupTrash(ctx, days, false)
go m.cleanupDelayedSlices(ctx, days)
go m.doCleanupTrash(cCtx, days, false)
go m.cleanupDelayedSlices(cCtx, days)
}
}
}
Expand Down Expand Up @@ -2639,7 +2646,7 @@ func (m *baseMeta) CleanupTrashBefore(ctx Context, edge time.Time, increProgress
rmdir = false
continue
}
if time.Since(now) > 50*time.Minute {
if ctx.Canceled() {
return
}
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/meta/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package meta

import (
"context"
"time"
)

type CtxKey string
Expand Down Expand Up @@ -95,6 +96,11 @@ func WrapWithCancel(ctx context.Context, pid, uid uint32, gids []uint32) Context
return &wrapContext{c, cancel, pid, uid, gids}
}

func WrapWithTimeout(ctx Context, timeout time.Duration) Context {
c, cancel := context.WithTimeout(ctx, timeout)
return &wrapContext{c, cancel, ctx.Pid(), ctx.Uid(), ctx.Gids()}
}

func WrapWithoutCancel(ctx context.Context, pid, uid uint32, gids []uint32) Context {
return &wrapContext{ctx, nil, pid, uid, gids}
}
Expand Down
23 changes: 11 additions & 12 deletions pkg/meta/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -2778,10 +2778,8 @@ func (m *redisMeta) doFindDeletedFiles(ts int64, limit int) (map[Ino]uint64, err
return files, nil
}

func (m *redisMeta) doCleanupSlices() {
start := time.Now()
stop := fmt.Errorf("exceeded time limit")
_ = m.hscan(Background(), m.sliceRefs(), func(keys []string) error {
func (m *redisMeta) doCleanupSlices(ctx Context) {
_ = m.hscan(ctx, m.sliceRefs(), func(keys []string) error {
for i := 0; i < len(keys); i += 2 {
key, val := keys[i], keys[i+1]
if strings.HasPrefix(val, "-") { // < 0
Expand All @@ -2796,8 +2794,8 @@ func (m *redisMeta) doCleanupSlices() {
} else if val == "0" {
m.cleanupZeroRef(key)
}
if time.Since(start) > 50*time.Minute {
return stop
if ctx.Canceled() {
return ctx.Err()
}
}
return nil
Expand Down Expand Up @@ -2979,13 +2977,14 @@ func (m *redisMeta) doDeleteFileData_(inode Ino, length uint64, tracking string)
}

func (r *redisMeta) doCleanupDelayedSlices(ctx Context, edge int64) (int, error) {
start := time.Now()
stop := fmt.Errorf("reach limit")
var count int
var ss []Slice
var rs []*redis.IntCmd
err := r.hscan(ctx, r.delSlices(), func(keys []string) error {
for i := 0; i < len(keys); i += 2 {
if ctx.Canceled() {
return ctx.Err()
}
key := keys[i]
ps := strings.Split(key, "_")
if len(ps) != 2 {
Expand Down Expand Up @@ -3029,14 +3028,14 @@ func (r *redisMeta) doCleanupDelayedSlices(ctx Context, edge int64) (int, error)
r.deleteSlice(s.Id, s.Size)
count++
}
if time.Since(start) > 50*time.Minute {
return stop
if ctx.Canceled() {
return ctx.Err()
}
}
}
return nil
})
if err == stop {
if errors.Is(err, context.DeadlineExceeded) {
err = nil
}
return count, err
Expand Down Expand Up @@ -3250,7 +3249,7 @@ func (m *redisMeta) ListSlices(ctx Context, slices map[Ino][]Slice, scanPending,
m.cleanupLeakedChunks(delete)
m.cleanupOldSliceRefs(delete)
if delete {
m.doCleanupSlices()
m.doCleanupSlices(ctx)
}

p := m.rdb.Pipeline()
Expand Down
20 changes: 9 additions & 11 deletions pkg/meta/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -1713,7 +1713,7 @@ func (m *dbMeta) doMknod(ctx Context, parent Ino, name string, _type uint8, mode
} else {
n.Mode = mode & ^cumask
}
if (pn.Flags&FlagSkipTrash) != 0 {
if (pn.Flags & FlagSkipTrash) != 0 {
n.Flags |= FlagSkipTrash
}

Expand Down Expand Up @@ -1852,7 +1852,7 @@ func (m *dbMeta) doUnlink(ctx Context, parent Ino, name string, attr *Attr, skip
if (n.Flags&FlagAppend) != 0 || (n.Flags&FlagImmutable) != 0 {
return syscall.EPERM
}
if (n.Flags&FlagSkipTrash) != 0 {
if (n.Flags & FlagSkipTrash) != 0 {
trash = 0
}
if trash > 0 && n.Nlink > 1 {
Expand Down Expand Up @@ -2023,7 +2023,7 @@ func (m *dbMeta) doRmdir(ctx Context, parent Ino, name string, pinode *Ino, attr
if exist {
return syscall.ENOTEMPTY
}
if (n.Flags&FlagSkipTrash) != 0 {
if (n.Flags & FlagSkipTrash) != 0 {
trash = 0
}
now := time.Now().UnixNano()
Expand Down Expand Up @@ -2234,7 +2234,7 @@ func (m *dbMeta) doRename(ctx Context, parentSrc Ino, nameSrc string, parentDst
if (dn.Flags&FlagAppend) != 0 || (dn.Flags&FlagImmutable) != 0 {
return syscall.EPERM
}
if (dn.Flags&FlagSkipTrash) != 0 {
if (dn.Flags & FlagSkipTrash) != 0 {
trash = 0
}
dn.setCtime(now)
Expand Down Expand Up @@ -3076,16 +3076,15 @@ func (m *dbMeta) doFindDeletedFiles(ts int64, limit int) (map[Ino]uint64, error)
return files, err
}

func (m *dbMeta) doCleanupSlices() {
func (m *dbMeta) doCleanupSlices(ctx Context) {
var cks []sliceRef
_ = m.simpleTxn(Background(), func(s *xorm.Session) error {
_ = m.simpleTxn(ctx, func(s *xorm.Session) error {
cks = nil
return s.Where("refs <= 0").Find(&cks)
})
start := time.Now()
for _, ck := range cks {
m.deleteSlice(ck.Id, ck.Size)
if time.Since(start) > 50*time.Minute {
if ctx.Canceled() {
break
}
}
Expand Down Expand Up @@ -3165,7 +3164,6 @@ func (m *dbMeta) doDeleteFileData(inode Ino, length uint64) {
}

func (m *dbMeta) doCleanupDelayedSlices(ctx Context, edge int64) (int, error) {
start := time.Now()
var count int
var ss []Slice
var result []delslices
Expand Down Expand Up @@ -3213,7 +3211,7 @@ func (m *dbMeta) doCleanupDelayedSlices(ctx Context, edge int64) (int, error) {
m.deleteSlice(s.Id, s.Size)
count++
}
if time.Since(start) > 50*time.Minute {
if ctx.Canceled() {
return count, nil
}
}
Expand Down Expand Up @@ -3325,7 +3323,7 @@ func (m *dbMeta) scanAllChunks(ctx Context, ch chan<- cchunk, bar *utils.Bar) er

func (m *dbMeta) ListSlices(ctx Context, slices map[Ino][]Slice, scanPending, delete bool, showProgress func()) syscall.Errno {
if delete {
m.doCleanupSlices()
m.doCleanupSlices(ctx)
}
err := m.simpleTxn(ctx, func(s *xorm.Session) error {
var cs []chunk
Expand Down
10 changes: 4 additions & 6 deletions pkg/meta/tkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -2278,12 +2278,11 @@ func (m *kvMeta) doFindDeletedFiles(ts int64, limit int) (map[Ino]uint64, error)
return files, err
}

func (m *kvMeta) doCleanupSlices() {
func (m *kvMeta) doCleanupSlices(ctx Context) {
if m.Name() == "tikv" {
m.client.gc()
}
klen := 1 + 8 + 4
start := time.Now()
_ = m.client.scan(m.fmtKey("K"), func(k, v []byte) bool {
if len(k) == klen && len(v) == 8 && parseCounter(v) <= 0 {
rb := utils.FromBuffer(k[1:])
Expand All @@ -2295,7 +2294,7 @@ func (m *kvMeta) doCleanupSlices() {
} else {
m.cleanupZeroRef(id, size)
}
if time.Since(start) > 50*time.Minute {
if ctx.Canceled() {
return false
}
}
Expand Down Expand Up @@ -2359,7 +2358,6 @@ func (m *kvMeta) doDeleteFileData(inode Ino, length uint64) {
}

func (m *kvMeta) doCleanupDelayedSlices(ctx Context, edge int64) (int, error) {
start := time.Now()
var count int
var ss []Slice
var rs []int64
Expand Down Expand Up @@ -2408,7 +2406,7 @@ func (m *kvMeta) doCleanupDelayedSlices(ctx Context, edge int64) (int, error) {
m.deleteSlice(s.Id, s.Size)
count++
}
if time.Since(start) > 50*time.Minute {
if ctx.Canceled() {
return count, nil
}
}
Expand Down Expand Up @@ -2498,7 +2496,7 @@ func (m *kvMeta) scanAllChunks(ctx Context, ch chan<- cchunk, bar *utils.Bar) er

func (m *kvMeta) ListSlices(ctx Context, slices map[Ino][]Slice, scanPending, delete bool, showProgress func()) syscall.Errno {
if delete {
m.doCleanupSlices()
m.doCleanupSlices(ctx)
}
// AiiiiiiiiCnnnn file chunks
klen := 1 + 8 + 1 + 4
Expand Down
Loading