Skip to content

Commit ac9a444

Browse files
committed
feat: beautify cdn log
Signed-off-by: santong <[email protected]>
1 parent fc55eaa commit ac9a444

File tree

11 files changed

+89
-40
lines changed

11 files changed

+89
-40
lines changed

cdnsystem/supervisor/cdn/cache_detector.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ func (cd *cacheDetector) detectCache(ctx context.Context, task *types.SeedTask,
7474
}()
7575
result, err = cd.doDetect(ctx, task, fileDigest)
7676
if err != nil {
77-
logger.WithTaskID(task.TaskID).Infof("failed to detect cache, reset cache: %v", err)
77+
task.Log().Infof("failed to detect cache, reset cache: %v", err)
7878
metaData, err := cd.resetCache(task)
7979
if err == nil {
8080
result = &cacheResult{
@@ -85,7 +85,7 @@ func (cd *cacheDetector) detectCache(ctx context.Context, task *types.SeedTask,
8585
return result, err
8686
}
8787
if err := cd.cacheDataManager.updateAccessTime(task.TaskID, getCurrentTimeMillisFunc()); err != nil {
88-
logger.WithTaskID(task.TaskID).Warnf("failed to update task access time ")
88+
task.Log().Warnf("failed to update task access time ")
8989
}
9090
return result, nil
9191
}
@@ -107,9 +107,9 @@ func (cd *cacheDetector) doDetect(ctx context.Context, task *types.SeedTask, fil
107107
expired, err := source.IsExpired(ctx, task.URL, task.Header, fileMetaData.ExpireInfo)
108108
if err != nil {
109109
// 如果获取失败,则认为没有过期,防止打爆源
110-
logger.WithTaskID(task.TaskID).Errorf("failed to check if the task expired: %v", err)
110+
task.Log().Errorf("failed to check if the task expired: %v", err)
111111
}
112-
logger.WithTaskID(task.TaskID).Debugf("task expired result: %t", expired)
112+
task.Log().Debugf("task expired result: %t", expired)
113113
if expired {
114114
return nil, cdnerrors.ErrResourceExpired{URL: task.URL}
115115
}

cdnsystem/supervisor/cdn/downloader.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import (
2222
"io"
2323

2424
"d7y.io/dragonfly/v2/cdnsystem/types"
25-
logger "d7y.io/dragonfly/v2/internal/dflog"
2625
"d7y.io/dragonfly/v2/pkg/source"
2726
"d7y.io/dragonfly/v2/pkg/structure/maputils"
2827
"d7y.io/dragonfly/v2/pkg/util/rangeutils"
@@ -43,7 +42,7 @@ func (cm *Manager) download(ctx context.Context, task *types.SeedTask, detectRes
4342
headers[RangeHeaderName] = fmt.Sprintf("bytes=%s", breakRange)
4443
}
4544
}
46-
logger.WithTaskID(task.TaskID).Infof("start download url %s at range: %d-%d: with header: %+v", task.URL, detectResult.breakPoint,
45+
task.Log().Infof("start download url %s at range: %d-%d: with header: %+v", task.URL, detectResult.breakPoint,
4746
task.SourceFileLength, task.Header)
4847
reader, responseHeader, err := source.DownloadWithResponseHeader(ctx, task.URL, headers)
4948
// update Expire info

cdnsystem/supervisor/cdn/manager.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -113,15 +113,15 @@ func (cm *Manager) TriggerCDN(ctx context.Context, task *types.SeedTask) (seedTa
113113
return seedTask, errors.Wrapf(err, "failed to detect cache")
114114
}
115115
span.SetAttributes(config.AttributeCacheResult.String(detectResult.String()))
116-
logger.WithTaskID(task.TaskID).Debugf("detects cache result: %+v", detectResult)
116+
task.Log().Debugf("detects cache result: %+v", detectResult)
117117
// second: report detect result
118118
err = cm.cdnReporter.reportCache(ctx, task.TaskID, detectResult)
119119
if err != nil {
120-
logger.WithTaskID(task.TaskID).Errorf("failed to report cache, reset detectResult: %v", err)
120+
task.Log().Errorf("failed to report cache, reset detectResult: %v", err)
121121
}
122122
// full cache
123123
if detectResult.breakPoint == -1 {
124-
logger.WithTaskID(task.TaskID).Infof("cache full hit on local")
124+
task.Log().Infof("cache full hit on local")
125125
seedTask.UpdateTaskInfo(types.TaskInfoCdnStatusSuccess, detectResult.fileMetaData.SourceRealDigest, detectResult.fileMetaData.PieceMd5Sign,
126126
detectResult.fileMetaData.SourceFileLen, detectResult.fileMetaData.CdnFileLength)
127127
return seedTask, nil
@@ -148,7 +148,7 @@ func (cm *Manager) TriggerCDN(ctx context.Context, task *types.SeedTask) (seedTa
148148
if err != nil {
149149
server.StatSeedFinish(task.TaskID, task.URL, false, err, start.Nanosecond(), time.Now().Nanosecond(), downloadMetadata.backSourceLength,
150150
downloadMetadata.realSourceFileLength)
151-
logger.WithTaskID(task.TaskID).Errorf("failed to write for task: %v", err)
151+
task.Log().Errorf("failed to write for task: %v", err)
152152
seedTask.UpdateStatus(types.TaskInfoCdnStatusFailed)
153153
return seedTask, err
154154
}
@@ -179,7 +179,7 @@ func (cm *Manager) TryFreeSpace(fileLength int64) (bool, error) {
179179
}
180180

181181
func (cm *Manager) handleCDNResult(task *types.SeedTask, sourceDigest string, downloadMetadata *downloadMetadata) (bool, error) {
182-
logger.WithTaskID(task.TaskID).Debugf("handle cdn result, downloadMetaData: %+v", downloadMetadata)
182+
task.Log().Debugf("handle cdn result, downloadMetaData: %+v", downloadMetadata)
183183
var isSuccess = true
184184
var errorMsg string
185185
// check md5
@@ -222,7 +222,7 @@ func (cm *Manager) handleCDNResult(task *types.SeedTask, sourceDigest string, do
222222
return false, errors.New(errorMsg)
223223
}
224224

225-
logger.WithTaskID(task.TaskID).Infof("success to get task, downloadMetadata: %+v realDigest: %s", downloadMetadata, sourceDigest)
225+
task.Log().Infof("success to get task, downloadMetadata: %+v realDigest: %s", downloadMetadata, sourceDigest)
226226

227227
return true, nil
228228
}

cdnsystem/supervisor/cdn/storage/hybrid/hybrid.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ func (h *hybridStorageMgr) CreateUploadLink(taskID string) error {
276276

277277
func (h *hybridStorageMgr) ResetRepo(task *types.SeedTask) error {
278278
if err := h.deleteTaskFiles(task.TaskID, false, true); err != nil {
279-
logger.WithTaskID(task.TaskID).Errorf("reset repo: failed to delete task files: %v", err)
279+
task.Log().Errorf("reset repo: failed to delete task files: %v", err)
280280
}
281281
// 判断是否有足够空间存放
282282
shmPath, err := h.tryShmSpace(task.URL, task.TaskID, task.SourceFileLength)

cdnsystem/supervisor/task/manager.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -83,18 +83,18 @@ func (tm *Manager) Register(ctx context.Context, req *types.TaskRegisterRequest)
8383
}
8484
taskBytes, _ := json.Marshal(task)
8585
span.SetAttributes(config.AttributeTaskInfo.String(string(taskBytes)))
86-
logger.WithTaskID(task.TaskID).Debugf("success get task info: %+v", task)
86+
task.Log().Debugf("success get task info: %+v", task)
8787

8888
// update accessTime for taskId
8989
if err := tm.accessTimeMap.Add(task.TaskID, time.Now()); err != nil {
90-
logger.WithTaskID(task.TaskID).Warnf("failed to update accessTime: %v", err)
90+
task.Log().Warnf("failed to update accessTime: %v", err)
9191
}
9292

9393
// trigger CDN
9494
if err := tm.triggerCdnSyncAction(ctx, task); err != nil {
9595
return nil, errors.Wrapf(err, "trigger cdn")
9696
}
97-
logger.WithTaskID(task.TaskID).Infof("successfully trigger cdn sync action")
97+
task.Log().Infof("successfully trigger cdn sync action")
9898
// watch seed progress
9999
return tm.progressMgr.WatchSeedProgress(ctx, task.TaskID)
100100
}
@@ -107,7 +107,7 @@ func (tm *Manager) triggerCdnSyncAction(ctx context.Context, task *types.SeedTas
107107
synclock.Lock(task.TaskID, true)
108108
if !task.IsFrozen() {
109109
span.SetAttributes(config.AttributeTaskStatus.String(task.CdnStatus))
110-
logger.WithTaskID(task.TaskID).Infof("seedTask is running or has been downloaded successfully, status: %s", task.CdnStatus)
110+
task.Log().Infof("seedTask is running or has been downloaded successfully, status: %s", task.CdnStatus)
111111
synclock.UnLock(task.TaskID, true)
112112
return nil
113113
}
@@ -118,12 +118,12 @@ func (tm *Manager) triggerCdnSyncAction(ctx context.Context, task *types.SeedTas
118118
// reconfirm
119119
span.SetAttributes(config.AttributeTaskStatus.String(task.CdnStatus))
120120
if !task.IsFrozen() {
121-
logger.WithTaskID(task.TaskID).Infof("reconfirm find seedTask is running or has been downloaded successfully, status: %s", task.CdnStatus)
121+
task.Log().Infof("reconfirm find seedTask is running or has been downloaded successfully, status: %s", task.CdnStatus)
122122
return nil
123123
}
124124
if task.IsWait() {
125125
tm.progressMgr.InitSeedProgress(ctx, task.TaskID)
126-
logger.WithTaskID(task.TaskID).Infof("successfully init seed progress for task")
126+
task.Log().Infof("successfully init seed progress for task")
127127
}
128128
updatedTask, err := tm.updateTask(task.TaskID, &types.SeedTask{
129129
CdnStatus: types.TaskInfoCdnStatusRunning,
@@ -135,19 +135,19 @@ func (tm *Manager) triggerCdnSyncAction(ctx context.Context, task *types.SeedTas
135135
go func() {
136136
updateTaskInfo, err := tm.cdnMgr.TriggerCDN(ctx, task)
137137
if err != nil {
138-
logger.WithTaskID(task.TaskID).Errorf("trigger cdn get error: %v", err)
138+
task.Log().Errorf("trigger cdn get error: %v", err)
139139
}
140140
go func() {
141141
if err := tm.progressMgr.PublishTask(ctx, task.TaskID, updateTaskInfo); err != nil {
142-
logger.WithTaskID(task.TaskID).Errorf("failed to publish task: %v", err)
142+
task.Log().Errorf("failed to publish task: %v", err)
143143
}
144144

145145
}()
146146
updatedTask, err = tm.updateTask(task.TaskID, updateTaskInfo)
147147
if err != nil {
148-
logger.WithTaskID(task.TaskID).Errorf("failed to update task: %v", err)
148+
task.Log().Errorf("failed to update task: %v", err)
149149
}
150-
logger.WithTaskID(task.TaskID).Infof("successfully update task cdn updatedTask: %+v", updatedTask)
150+
task.Log().Infof("successfully update task cdn updatedTask: %+v", updatedTask)
151151
}()
152152
return nil
153153
}

cdnsystem/supervisor/task/manager_util.go

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,6 @@ import (
3535
"go.opentelemetry.io/otel/trace"
3636
)
3737

38-
const (
39-
IllegalSourceFileLen = -100
40-
)
41-
4238
// addOrUpdateTask add a new task or update exist task
4339
func (tm *Manager) addOrUpdateTask(ctx context.Context, request *types.TaskRegisterRequest) (*types.SeedTask, error) {
4440
var span trace.Span
@@ -67,15 +63,7 @@ func (tm *Manager) addOrUpdateTask(ctx context.Context, request *types.TaskRegis
6763
}
6864

6965
var task *types.SeedTask
70-
newTask := &types.SeedTask{
71-
TaskID: taskID,
72-
Header: request.Header,
73-
RequestDigest: request.Digest,
74-
URL: request.URL,
75-
TaskURL: taskURL,
76-
CdnStatus: types.TaskInfoCdnStatusWaiting,
77-
SourceFileLength: IllegalSourceFileLen,
78-
}
66+
newTask := types.NewSeedTask(taskID, request.Header, request.Digest, request.URL, taskURL)
7967
// using the existing task if it already exists corresponding to taskID
8068
if v, err := tm.taskStore.Get(taskID); err == nil {
8169
span.SetAttributes(config.AttributeIfReuseTask.Bool(true))
@@ -92,7 +80,7 @@ func (tm *Manager) addOrUpdateTask(ctx context.Context, request *types.TaskRegis
9280
task = newTask
9381
}
9482

95-
if task.SourceFileLength != IllegalSourceFileLen {
83+
if task.SourceFileLength != types.IllegalSourceFileLen {
9684
return task, nil
9785
}
9886

@@ -102,7 +90,7 @@ func (tm *Manager) addOrUpdateTask(ctx context.Context, request *types.TaskRegis
10290
span.AddEvent(config.EventRequestSourceFileLength)
10391
sourceFileLength, err := source.GetContentLength(ctx, task.URL, request.Header)
10492
if err != nil {
105-
logger.WithTaskID(task.TaskID).Errorf("failed to get url (%s) content length: %v", task.URL, err)
93+
task.Log().Errorf("failed to get url (%s) content length: %v", task.URL, err)
10694
if cdnerrors.IsURLNotReachable(err) {
10795
tm.taskURLUnReachableStore.Add(taskID, time.Now())
10896
return nil, err

cdnsystem/types/seed_task_info.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@
1616

1717
package types
1818

19+
import (
20+
logger "d7y.io/dragonfly/v2/internal/dflog"
21+
)
22+
1923
type SeedTask struct {
2024
TaskID string `json:"taskId,omitempty"`
2125
URL string `json:"url,omitempty"`
@@ -29,6 +33,24 @@ type SeedTask struct {
2933
RequestDigest string `json:"requestDigest,omitempty"`
3034
SourceRealDigest string `json:"sourceRealDigest,omitempty"`
3135
PieceMd5Sign string `json:"pieceMd5Sign,omitempty"`
36+
logger *logger.SugaredLoggerOnWith
37+
}
38+
39+
const (
40+
IllegalSourceFileLen = -100
41+
)
42+
43+
func NewSeedTask(taskID string, header map[string]string, digest string, url string, taskURL string) *SeedTask {
44+
return &SeedTask{
45+
TaskID: taskID,
46+
Header: header,
47+
RequestDigest: digest,
48+
URL: url,
49+
TaskURL: taskURL,
50+
CdnStatus: TaskInfoCdnStatusWaiting,
51+
SourceFileLength: IllegalSourceFileLen,
52+
logger: logger.WithTaskID(taskID),
53+
}
3254
}
3355

3456
// IsSuccess determines that whether the CDNStatus is success.
@@ -67,6 +89,13 @@ func (task *SeedTask) UpdateTaskInfo(cdnStatus, realDigest, pieceMd5Sign string,
6789
task.CdnFileLength = cdnFileLength
6890
}
6991

92+
func (task *SeedTask) Log() *logger.SugaredLoggerOnWith {
93+
if task.logger == nil {
94+
task.logger = logger.WithTaskID(task.TaskID)
95+
}
96+
return task.logger
97+
}
98+
7099
const (
71100

72101
// TaskInfoCdnStatusWaiting captures enum value "WAITING"

pkg/source/httpprotocol/http_source_client.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,11 @@ import (
2626
"os"
2727
"time"
2828

29+
"d7y.io/dragonfly/v2/cdnsystem/types"
2930
"d7y.io/dragonfly/v2/pkg/util/rangeutils"
3031

3132
"github.com/go-http-utils/headers"
3233

33-
"d7y.io/dragonfly/v2/cdnsystem/supervisor/task"
3434
"d7y.io/dragonfly/v2/pkg/source"
3535
"d7y.io/dragonfly/v2/pkg/structure/maputils"
3636
"d7y.io/dragonfly/v2/pkg/util/stringutils"
@@ -113,7 +113,7 @@ func (client *httpSourceClient) GetContentLength(ctx context.Context, url string
113113
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent {
114114
// TODO Whether this situation should be distinguished from the err situation,
115115
//similar to proposing another error type to indicate that this error can interact with the URL, but the status code does not meet expectations
116-
return task.IllegalSourceFileLen, fmt.Errorf("get http resource length failed, unexpected code: %d", resp.StatusCode)
116+
return types.IllegalSourceFileLen, fmt.Errorf("get http resource length failed, unexpected code: %d", resp.StatusCode)
117117
}
118118
return resp.ContentLength, nil
119119
}

scheduler/supervisor/host.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,5 +130,16 @@ func (h *PeerHost) DecUploadLoad() int32 {
130130
}
131131

132132
func (h *PeerHost) Log() *logger.SugaredLoggerOnWith {
133+
h.lock.RLock()
134+
if h.logger != nil {
135+
h.lock.RUnlock()
136+
return h.logger
137+
}
138+
h.lock.RUnlock()
139+
h.lock.Lock()
140+
defer h.lock.Unlock()
141+
if h.logger == nil {
142+
h.logger = logger.WithTaskID(h.UUID)
143+
}
133144
return h.logger
134145
}

scheduler/supervisor/peer.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,17 @@ func (peer *Peer) CloseChannel(err error) error {
344344
}
345345

346346
func (peer *Peer) Log() *logger.SugaredLoggerOnWith {
347+
peer.lock.RLock()
348+
if peer.logger != nil {
349+
peer.lock.RUnlock()
350+
return peer.logger
351+
}
352+
peer.lock.RUnlock()
353+
peer.lock.Lock()
354+
defer peer.lock.Unlock()
355+
if peer.logger == nil {
356+
peer.logger = logger.WithTaskID(peer.PeerID)
357+
}
347358
return peer.logger
348359
}
349360

0 commit comments

Comments
 (0)