Skip to content
This repository was archived by the owner on Dec 20, 2024. It is now read-only.

Commit 6bb4262

Browse files
committed
feature: implement the dfgetTaskMgr
Signed-off-by: Starnop <[email protected]>
1 parent 578ac41 commit 6bb4262

4 files changed

Lines changed: 216 additions & 8 deletions

File tree

apis/swagger.yml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -827,6 +827,12 @@ definitions:
827827
description: |
828828
The status of Dfget download process.
829829
enum: ["WAITING", "RUNNING", "FAILED", "SUCCESS",]
830+
peerID:
831+
type: "string"
832+
description: |
833+
PeerID uniquely identifies a peer, and the cID uniquely identifies a
834+
download task belonging to a peer. One peer can initiate multiple download tasks,
835+
which means that one peer corresponds to multiple cIDs.
830836
831837
ErrorResponse:
832838
type: "object"

apis/types/df_get_task.go

Lines changed: 6 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

supernode/daemon/mgr/dfget_task_mgr.go

Lines changed: 119 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,32 +2,143 @@ package mgr
22

33
import (
44
"context"
5+
"fmt"
56

67
"github.com/dragonflyoss/Dragonfly/apis/types"
8+
cutil "github.com/dragonflyoss/Dragonfly/common/util"
9+
errorType "github.com/dragonflyoss/Dragonfly/supernode/errors"
10+
11+
"github.com/pkg/errors"
712
)
813

914
// DfgetTaskMgr as an interface defines all operations against DfgetTask.
1015
// A DfgetTask represents a download process initiated by dfget or other clients.
1116
type DfgetTaskMgr interface {
12-
// Add a new dfgetTask, we use clientID to identify a dfgetTask uniquely.
17+
// Add a new dfgetTask, we use clientID and taskID to identify a dfgetTask uniquely.
1318
// ClientID should be generated by dfget, supernode will use it directly.
1419
// NOTE: We should create a new dfgetTask for each download process,
1520
// even if the downloads initiated by the same machine.
1621
Add(ctx context.Context, dfgetTask *types.DfGetTask) error
1722

18-
// Get get a dfgetTask info with specified clientID.
19-
Get(ctx context.Context, clientID string) (dfgetTask *types.DfGetTask, err error)
23+
// Get a dfgetTask info with specified clientID and taskID.
24+
Get(ctx context.Context, clientID, taskID string) (dfgetTask *types.DfGetTask, err error)
2025

21-
// List return the list of dfgetTask.
26+
// List returns the list of dfgetTask.
2227
List(ctx context.Context, filter map[string]string) (dfgetTaskList []*types.DfGetTask, err error)
2328

24-
// Delete a dfgetTask with clientID.
25-
Delete(ctx context.Context, clientID string) error
29+
// Delete a dfgetTask with clientID and taskID.
30+
Delete(ctx context.Context, clientID, taskID string) error
2631

27-
// UpdateStatus update the status of dfgetTask with specified clientID.
32+
// UpdateStatus update the status of dfgetTask with specified clientID and taskID.
2833
// Supernode will update the status of dfgetTask in the following situations:
2934
// 1. after init the dfgetTask
3035
// 2. when success/fail to download some pieces
3136
// 3. when the entire download process ends in success or failure
32-
UpdateStatus(ctx context.Context, clientID string, status string) error
37+
UpdateStatus(ctx context.Context, clientID, taskID, status string) error
38+
}
39+
40+
// DfgetTaskManager is an implementation of the interface of DfgetTaskMgr.
41+
type DfgetTaskManager struct {
42+
dfgetTaskStore *Store
43+
}
44+
45+
// NewDfgetTaskManager returns a new DfgetTaskManager.
46+
func NewDfgetTaskManager() (*DfgetTaskManager, error) {
47+
return &DfgetTaskManager{
48+
dfgetTaskStore: NewStore(),
49+
}, nil
50+
}
51+
52+
// Add a new dfgetTask, we use clientID and taskID to identify a dfgetTask uniquely.
53+
// ClientID should be generated by dfget, supernode will use it directly.
54+
// NOTE: We should create a new dfgetTask for each download process,
55+
// even if the downloads initiated by the same machine.
56+
func (dtm *DfgetTaskManager) Add(ctx context.Context, dfgetTask *types.DfGetTask) error {
57+
if cutil.IsEmptyStr(dfgetTask.Path) {
58+
return errors.Wrapf(errorType.ErrEmptyValue, "Path")
59+
}
60+
61+
if cutil.IsEmptyStr(dfgetTask.PeerID) {
62+
return errors.Wrapf(errorType.ErrEmptyValue, "PeerID")
63+
}
64+
65+
key, err := generateKey(dfgetTask.CID, dfgetTask.TaskID)
66+
if err != nil {
67+
return err
68+
}
69+
70+
// the default status of DfgetTask is WAITING
71+
if cutil.IsEmptyStr(dfgetTask.Status) {
72+
dfgetTask.Status = types.DfGetTaskStatusWAITING
73+
}
74+
75+
// TODO: should we verify that the peerID is valid here.
76+
77+
dtm.dfgetTaskStore.Put(key, dfgetTask)
78+
return nil
79+
}
80+
81+
// Get a dfgetTask info with specified clientID and taskID.
82+
func (dtm *DfgetTaskManager) Get(ctx context.Context, clientID, taskID string) (dfgetTask *types.DfGetTask, err error) {
83+
return dtm.getDfgetTask(clientID, taskID)
84+
}
85+
86+
// List returns the list of dfgetTask.
87+
func (dtm *DfgetTaskManager) List(ctx context.Context, filter map[string]string) (dfgetTaskList []*types.DfGetTask, err error) {
88+
return nil, nil
89+
}
90+
91+
// Delete a dfgetTask with clientID and taskID.
92+
func (dtm *DfgetTaskManager) Delete(ctx context.Context, clientID, taskID string) error {
93+
key, err := generateKey(clientID, taskID)
94+
if err != nil {
95+
return err
96+
}
97+
98+
return dtm.dfgetTaskStore.Delete(key)
99+
}
100+
101+
// UpdateStatus update the status of dfgetTask with specified clientID and taskID.
102+
func (dtm *DfgetTaskManager) UpdateStatus(ctx context.Context, clientID, taskID, status string) error {
103+
dfgetTask, err := dtm.getDfgetTask(clientID, taskID)
104+
if err != nil {
105+
return err
106+
}
107+
108+
if dfgetTask.Status != types.DfGetTaskStatusSUCCESS {
109+
dfgetTask.Status = status
110+
}
111+
112+
return nil
113+
}
114+
115+
// getDfgetTask gets a DfGetTask from dfgetTaskStore with specified clientID and taskID.
116+
func (dtm *DfgetTaskManager) getDfgetTask(clientID, taskID string) (*types.DfGetTask, error) {
117+
key, err := generateKey(clientID, taskID)
118+
if err != nil {
119+
return nil, err
120+
}
121+
122+
v, err := dtm.dfgetTaskStore.Get(key)
123+
if err != nil {
124+
return nil, err
125+
}
126+
127+
if dfgetTask, ok := v.(*types.DfGetTask); ok {
128+
return dfgetTask, nil
129+
}
130+
return nil, errors.Wrapf(errorType.ErrConvertFailed, "clientID: %s, taskID: %s: %v", clientID, taskID, v)
131+
}
132+
133+
// generateKey gengerates a key for a dfgetTask.
134+
func generateKey(cID, taskID string) (string, error) {
135+
if cutil.IsEmptyStr(cID) {
136+
return "", errors.Wrapf(errorType.ErrEmptyValue, "cID")
137+
}
138+
139+
if cutil.IsEmptyStr(taskID) {
140+
return "", errors.Wrapf(errorType.ErrEmptyValue, "taskID")
141+
}
142+
143+
return fmt.Sprintf("%s%s%s", cID, "@", taskID), nil
33144
}
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Copyright The Dragonfly Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package mgr
18+
19+
import (
20+
"context"
21+
22+
"github.com/dragonflyoss/Dragonfly/apis/types"
23+
"github.com/dragonflyoss/Dragonfly/supernode/errors"
24+
25+
"github.com/go-check/check"
26+
)
27+
28+
func init() {
29+
check.Suite(&DfgetTaskMgrTestSuite{})
30+
}
31+
32+
type DfgetTaskMgrTestSuite struct {
33+
}
34+
35+
func (s *DfgetTaskMgrTestSuite) TestDfgetTaskMgr(c *check.C) {
36+
dfgetTaskManager, _ := NewDfgetTaskManager()
37+
clientID := "foo"
38+
taskID := "00c4e7b174af7ed61c414b36ef82810ac0c98142c03e5748c00e1d1113f3c882"
39+
40+
// Add
41+
dfgetTask := &types.DfGetTask{
42+
CID: clientID,
43+
Path: "/peer/file/taskFileName",
44+
PieceSize: 4 * 1024 * 1024,
45+
TaskID: taskID,
46+
PeerID: "foo-192.168.10.11-1553838710990554281",
47+
}
48+
49+
err := dfgetTaskManager.Add(context.Background(), dfgetTask)
50+
c.Check(err, check.IsNil)
51+
52+
// Get
53+
dt, err := dfgetTaskManager.Get(context.Background(), clientID, taskID)
54+
c.Check(err, check.IsNil)
55+
c.Check(dt, check.DeepEquals, &types.DfGetTask{
56+
CID: clientID,
57+
Path: "/peer/file/taskFileName",
58+
PieceSize: 4 * 1024 * 1024,
59+
TaskID: taskID,
60+
Status: types.DfGetTaskStatusWAITING,
61+
PeerID: "foo-192.168.10.11-1553838710990554281",
62+
})
63+
64+
// UpdateStatus
65+
err = dfgetTaskManager.UpdateStatus(context.Background(), clientID, taskID, types.DfGetTaskStatusSUCCESS)
66+
c.Check(err, check.IsNil)
67+
68+
dt, err = dfgetTaskManager.Get(context.Background(), clientID, taskID)
69+
c.Check(err, check.IsNil)
70+
c.Check(dt, check.DeepEquals, &types.DfGetTask{
71+
CID: clientID,
72+
Path: "/peer/file/taskFileName",
73+
PieceSize: 4 * 1024 * 1024,
74+
TaskID: taskID,
75+
Status: types.DfGetTaskStatusSUCCESS,
76+
PeerID: "foo-192.168.10.11-1553838710990554281",
77+
})
78+
79+
// Delete
80+
err = dfgetTaskManager.Delete(context.Background(), clientID, taskID)
81+
c.Check(err, check.IsNil)
82+
83+
dt, err = dfgetTaskManager.Get(context.Background(), clientID, taskID)
84+
c.Check(errors.IsDataNotFound(err), check.Equals, true)
85+
}

0 commit comments

Comments
 (0)