Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions cmd/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func copyFileOnWindows(srcPath, destPath string) error {

func copyFile(srcPath, destPath string, requireRootPrivileges bool) error {
if runtime.GOOS == "windows" {
return utils.WithTimeout(func(context.Context) error {
return utils.WithTimeout(context.TODO(), func(context.Context) error {
return copyFileOnWindows(srcPath, destPath)
}, 3*time.Second)
}
Expand Down Expand Up @@ -148,7 +148,7 @@ func closeFile(file *os.File) {

func getPprofPort(pid, amp string, requireRootPrivileges bool) (int, error) {
cfg := vfs.Config{}
_ = utils.WithTimeout(func(context.Context) error {
_ = utils.WithTimeout(context.TODO(), func(context.Context) error {
content, err := readConfig(amp)
if err != nil {
logger.Warnf("failed to read config file: %v", err)
Expand Down Expand Up @@ -467,7 +467,7 @@ func collectSysInfo(ctx *cli.Context, currDir string) error {
func collectSpecialFile(ctx *cli.Context, amp string, currDir string, requireRootPrivileges bool, wg *sync.WaitGroup) error {
prefixed := true
configName := ".jfs.config"
_ = utils.WithTimeout(func(context.Context) error {
_ = utils.WithTimeout(context.TODO(), func(context.Context) error {
if !utils.Exists(filepath.Join(amp, configName)) {
configName = ".config"
prefixed = false
Expand Down Expand Up @@ -506,7 +506,7 @@ func debug(ctx *cli.Context) error {
setup(ctx, 1)
mp := ctx.Args().First()
var inode uint64
if err := utils.WithTimeout(func(context.Context) error {
if err := utils.WithTimeout(context.TODO(), func(context.Context) error {
var err error
if inode, err = utils.GetFileInode(mp); err != nil {
return fmt.Errorf("failed to lookup inode for %s: %s", mp, err)
Expand Down
2 changes: 1 addition & 1 deletion cmd/debug_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (

func getCmdMount(mp string) (uid, pid, cmd string, err error) {
var tmpPid string
_ = utils.WithTimeout(func(context.Context) error {
_ = utils.WithTimeout(context.TODO(), func(context.Context) error {
content, err := readConfig(mp)
if err != nil {
logger.Warnf("failed to read config file: %v", err)
Expand Down
2 changes: 1 addition & 1 deletion cmd/debug_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func getProcessUserSid(pid int) (string, error) {

func getCmdMount(mp string) (uid, pid, cmd string, err error) {
var tmpPid string
_ = utils.WithTimeout(func(context.Context) error {
_ = utils.WithTimeout(context.TODO(), func(context.Context) error {
content, err := readConfig(mp)
if err != nil {
logger.Warnf("failed to read config file: %v", err)
Expand Down
2 changes: 1 addition & 1 deletion cmd/mount.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ func mount(c *cli.Context) error {

var err error
if stage == 0 || supervisor == "test" {
err = utils.WithTimeout(func(context.Context) error {
err = utils.WithTimeout(context.TODO(), func(context.Context) error {
mp, err = filepath.Abs(mp)
return err
}, time.Second*3)
Expand Down
6 changes: 3 additions & 3 deletions cmd/mount_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,13 +500,13 @@ func prepareMp(mp string) {
}
var fi os.FileInfo
var ino uint64
err := utils.WithTimeout(func(context.Context) error {
err := utils.WithTimeout(context.TODO(), func(context.Context) error {
var err error
fi, err = os.Stat(mp)
return err
}, time.Second*3)
if !strings.Contains(mp, ":") && err != nil {
err2 := utils.WithTimeout(func(context.Context) error {
err2 := utils.WithTimeout(context.TODO(), func(context.Context) error {
return os.MkdirAll(mp, 0777)
}, time.Second*3)
if err2 != nil {
Expand Down Expand Up @@ -600,7 +600,7 @@ func canShutdownGracefully(mp string, newConf *vfs.Config) bool {
}
var ino uint64
var err error
err = utils.WithTimeout(func(context.Context) error {
err = utils.WithTimeout(context.TODO(), func(context.Context) error {
ino, err = utils.GetFileInode(mp)
return err
}, time.Second*3)
Expand Down
28 changes: 17 additions & 11 deletions pkg/chunk/cached_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,9 @@ func (s *rSlice) ReadAt(ctx context.Context, page *Page, off int) (n int, err er
sc = object.DefaultStorageClass
)
page.Acquire()
err = utils.WithTimeout(func(ctx context.Context) error {
err = utils.WithTimeout(ctx, func(cCtx context.Context) error {
defer page.Release()
in, err := s.store.storage.Get(ctx, key, int64(boff), int64(len(p)), object.WithRequestID(&reqID), object.WithStorageClass(&sc))
in, err := s.store.storage.Get(cCtx, key, int64(boff), int64(len(p)), object.WithRequestID(&reqID), object.WithStorageClass(&sc))
if err == nil {
n, err = io.ReadFull(in, p)
_ = in.Close()
Expand All @@ -181,6 +181,9 @@ func (s *rSlice) ReadAt(ctx context.Context, page *Page, off int) (n int, err er
}, s.store.conf.GetTimeout)
used := time.Since(st)
logRequest("GET", key, fmt.Sprintf("RANGE(%d,%d) ", boff, len(p)), reqID, err, used)
if errors.Is(err, context.Canceled) {
return 0, err
}
s.store.objectDataBytes.WithLabelValues("GET", sc).Add(float64(n))
s.store.objectReqsHistogram.WithLabelValues("GET", sc).Observe(used.Seconds())
if err == nil {
Expand All @@ -199,7 +202,7 @@ func (s *rSlice) ReadAt(ctx context.Context, page *Page, off int) (n int, err er
} else {
tmp.Acquire()
}
err = s.store.load(key, tmp, s.store.shouldCache(blockSize), false)
err = s.store.load(ctx, key, tmp, s.store.shouldCache(blockSize), false)
return tmp, err
})
defer block.Release()
Expand Down Expand Up @@ -349,7 +352,7 @@ func (store *cachedStore) put(key string, p *Page) error {
reqID string
sc = object.DefaultStorageClass
)
return utils.WithTimeout(func(ctx context.Context) error {
return utils.WithTimeout(context.TODO(), func(ctx context.Context) error {
defer p.Release()
st := time.Now()
err := store.storage.Put(ctx, key, bytes.NewReader(p.Data), object.WithRequestID(&reqID), object.WithStorageClass(&sc))
Expand All @@ -367,7 +370,7 @@ func (store *cachedStore) put(key string, p *Page) error {
func (store *cachedStore) delete(key string) error {
st := time.Now()
var reqID string
err := utils.WithTimeout(func(ctx context.Context) error {
err := utils.WithTimeout(context.TODO(), func(ctx context.Context) error {
return store.storage.Delete(ctx, key, object.WithRequestID(&reqID))
}, store.conf.PutTimeout)
used := time.Since(st)
Expand Down Expand Up @@ -455,7 +458,7 @@ func (s *wSlice) upload(indx int) {
stagingPath := "unknown"
stageFailed := false
block.Acquire()
err := utils.WithTimeout(func(context.Context) (err error) { // In case it hangs for more than 5 minutes(see fileWriter.flush), fallback to uploading directly to avoid `EIO`
err := utils.WithTimeout(context.TODO(), func(context.Context) (err error) { // In case it hangs for more than 5 minutes(see fileWriter.flush), fallback to uploading directly to avoid `EIO`
defer block.Release()
stagingPath, err = s.store.bcache.stage(key, block.Data)
if err == nil && stageFailed { // upload thread already marked me as failed because of timeout
Expand Down Expand Up @@ -714,7 +717,7 @@ func logRequest(typeStr, key, param, reqID string, err error, used time.Duration
}
}

func (store *cachedStore) load(key string, page *Page, cache bool, forceCache bool) (err error) {
func (store *cachedStore) load(ctx context.Context, key string, page *Page, cache bool, forceCache bool) (err error) {
defer func() {
e := recover()
if e != nil {
Expand Down Expand Up @@ -743,10 +746,10 @@ func (store *cachedStore) load(key string, page *Page, cache bool, forceCache bo
p = page
}
p.Acquire()
err = utils.WithTimeout(func(ctx context.Context) error {
err = utils.WithTimeout(ctx, func(cCtx context.Context) error {
defer p.Release()
// it will be retried in the upper layer.
in, err = store.storage.Get(ctx, key, 0, -1, object.WithRequestID(&reqID), object.WithStorageClass(&sc))
in, err = store.storage.Get(cCtx, key, 0, -1, object.WithRequestID(&reqID), object.WithStorageClass(&sc))
if err == nil {
n, err = io.ReadFull(in, p.Data)
_ = in.Close()
Expand All @@ -756,6 +759,9 @@ func (store *cachedStore) load(key string, page *Page, cache bool, forceCache bo
}
return err
}, store.conf.GetTimeout)
if errors.Is(err, context.Canceled) {
return err
}
used := time.Since(start)
logRequest("GET", key, "", reqID, err, used)
if store.downLimit != nil && compressed {
Expand Down Expand Up @@ -851,7 +857,7 @@ func NewCachedStore(storage object.ObjectStorage, config Config, reg prometheus.
defer p.Release()
block, err := store.group.Execute(key, func() (*Page, error) { // dedup requests with full read
p.Acquire()
err := store.load(key, p, false, false) // delay writing cache until singleflight ends to prevent blocking waiters
err := store.load(context.TODO(), key, p, false, false) // delay writing cache until singleflight ends to prevent blocking waiters
return p, err
})
defer block.Release()
Expand Down Expand Up @@ -1136,7 +1142,7 @@ func (store *cachedStore) FillCache(id uint64, length uint32) error {
continue
}
p := NewOffPage(size)
if e := store.load(k, p, true, true); e != nil {
if e := store.load(context.TODO(), k, p, true, true); e != nil {
logger.Warnf("Failed to load key: %s %s", k, e)
err = e
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/chunk/cached_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,6 @@ func TestStoreRetry(t *testing.T) {
cs := NewCachedStore(s, defaultConf, nil)
p := NewPage(nil)
defer p.Release()
cs.(*cachedStore).load("non", p, false, false) // wont retry
cs.(*cachedStore).load(context.TODO(), "non", p, false, false) // wont retry
require.Equal(t, int32(1), s.cnt)
}
4 changes: 3 additions & 1 deletion pkg/chunk/prefetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

package chunk

import "sync"
import (
"sync"
)

type prefetcher struct {
sync.Mutex
Expand Down
6 changes: 3 additions & 3 deletions pkg/meta/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -827,15 +827,15 @@ func (m *baseMeta) statRootFs(ctx Context, totalspace, availspace, iused, iavail
var err error
if !m.conf.FastStatfs || used == unknownUsage || inodes == unknownUsage {
var remoteUsed int64 // using an additional variable here to ensure the assignment inside `utils.WithTimeout` does not change the `used` variable again after a timeout.
err = utils.WithTimeout(func(context.Context) error {
err = utils.WithTimeout(ctx, func(context.Context) error {
remoteUsed, err = m.en.getCounter(usedSpace)
return err
}, time.Millisecond*150)
if err == nil {
used = remoteUsed
}
var remoteInodes int64
err = utils.WithTimeout(func(context.Context) error {
err = utils.WithTimeout(ctx, func(context.Context) error {
remoteInodes, err = m.en.getCounter(totalInodes)
return err
}, time.Millisecond*150)
Expand Down Expand Up @@ -1073,7 +1073,7 @@ func (m *baseMeta) GetAttr(ctx Context, inode Ino, attr *Attr) syscall.Errno {
if inode == RootInode || inode == TrashInode {
// doGetAttr could overwrite the `attr` after timeout
var a Attr
e := utils.WithTimeout(func(context.Context) error {
e := utils.WithTimeout(ctx, func(context.Context) error {
err = m.en.doGetAttr(ctx, inode, &a)
return nil
}, time.Millisecond*300)
Expand Down
7 changes: 5 additions & 2 deletions pkg/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,16 +95,19 @@ func FindLocalIPs() ([]net.IP, error) {
return ips, nil
}

func WithTimeout(f func(context.Context) error, timeout time.Duration) error {
func WithTimeout(pCtx context.Context, f func(context.Context) error, timeout time.Duration) error {
var done = make(chan int, 1)
var t = time.NewTimer(timeout)
var err error
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(pCtx)
go func() {
err = f(ctx)
done <- 1
}()
select {
case <-ctx.Done():
err = ctx.Err()
t.Stop()
case <-done:
t.Stop()
case <-t.C:
Expand Down
4 changes: 2 additions & 2 deletions pkg/utils/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,13 @@ func TestLocalIp(t *testing.T) {
}

func TestTimeout(t *testing.T) {
err := WithTimeout(func(context.Context) error {
err := WithTimeout(context.TODO(), func(context.Context) error {
return nil
}, time.Millisecond*10)
if err != nil {
t.Fatalf("fast function should return nil")
}
err = WithTimeout(func(context.Context) error {
err = WithTimeout(context.TODO(), func(context.Context) error {
time.Sleep(time.Millisecond * 100)
return nil
}, time.Millisecond*10)
Expand Down
7 changes: 6 additions & 1 deletion pkg/vfs/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ func (r *frange) include(a *frange) bool { return r.off <= a.off && a.end() <= r

// protected by file
type sliceReader struct {
ctx context.Context
cancel context.CancelFunc
file *fileReader
block *frange
state sstate
Expand Down Expand Up @@ -202,7 +204,8 @@ func (s *sliceReader) run() {
p := s.page.Slice(0, int(need))
defer p.Release()
var n int
ctx := context.WithValue(context.TODO(), meta.CtxKey("inode"), inode) // Output inode in log for debugging

ctx := context.WithValue(s.ctx, meta.CtxKey("inode"), inode) // Output inode in log for debugging
n = f.r.Read(ctx, p, slices, (uint32(s.block.off))%meta.ChunkSize)

f.Lock()
Expand Down Expand Up @@ -258,6 +261,7 @@ func (s *sliceReader) drop() {
s.state = INVALID // somebody still using it, so mark it for removal
}
}
s.cancel()
}

func (s *sliceReader) delete() {
Expand Down Expand Up @@ -306,6 +310,7 @@ func (f *fileReader) GetLength() uint64 {
// protected by f
func (f *fileReader) newSlice(block *frange) *sliceReader {
s := &sliceReader{}
s.ctx, s.cancel = context.WithCancel(context.Background())
s.file = f
s.lastAccess = time.Now()
s.indx = uint32(block.off / meta.ChunkSize)
Expand Down