Skip to content
This repository was archived by the owner on Dec 20, 2024. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions cmd/dfdaemon/app/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/dragonflyoss/Dragonfly/dfdaemon/config"
"github.com/dragonflyoss/Dragonfly/dfdaemon/constant"
dfgetcfg "github.com/dragonflyoss/Dragonfly/dfget/config"
"github.com/dragonflyoss/Dragonfly/dfget/core/api"
"github.com/dragonflyoss/Dragonfly/pkg/algorithm"
"github.com/dragonflyoss/Dragonfly/pkg/dflog"
"github.com/dragonflyoss/Dragonfly/pkg/errortypes"
Expand Down Expand Up @@ -62,8 +63,23 @@ func getLocalIP(nodes []string) (localIP string) {
var (
e error
)
supernodeAPI := api.NewSupernodeAPI()
for _, n := range nodes {
ip, port := netutils.GetIPAndPortFromNode(n, dfgetcfg.DefaultSupernodePort)
// step 1. query supernode api get request ip, check if request ip in local eth IPs
if localIP, e = supernodeAPI.Ping(fmt.Sprintf("%s:%d", ip, port)); e == nil {
logrus.Infof("Connect to supernode get self ip:%s", localIP)
if localIPs, err := netutils.GetAllIPs(); err == nil {
for _, lip := range localIPs {
if localIP == lip {
return localIP
}
}
}
} else {
logrus.Warnf("Connect to supernode:%s error: %v", n, e)
}
// step 2. if request ip not in eth ips, it might behind NAT, try use TCP conn to get localIP
if localIP, e = httputils.CheckConnect(ip, port, 1000); e == nil {
return localIP
}
Expand Down
20 changes: 20 additions & 0 deletions dfget/core/api/supernode_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (

/* the url paths of supernode APIs*/
const (
pingPath = "/_ping"
peerRegisterPath = "/peer/registry"
peerPullPieceTaskPath = "/peer/task"
peerReportPiecePath = "/peer/piece/suc"
Expand All @@ -53,6 +54,7 @@ func NewSupernodeAPI() SupernodeAPI {

// SupernodeAPI defines the communication methods between supernode and dfget.
type SupernodeAPI interface {
Ping(node string) (reqIP string, e error)
Register(node string, req *types.RegisterRequest) (resp *types.RegisterResponse, e error)
PullPieceTask(node string, req *types.PullPieceTaskRequest) (resp *types.PullPieceTaskResponse, e error)
ReportPiece(node string, req *types.ReportPieceRequest) (resp *types.BaseResponse, e error)
Expand All @@ -74,6 +76,24 @@ type supernodeAPI struct {

var _ SupernodeAPI = &supernodeAPI{}

// Ping sends a request to the supernode to check if suppernode is ok
// and get request ip from supernode.
func (api *supernodeAPI) Ping(node string) (reqIP string, e error) {
var (
code int
body []byte
)
url := fmt.Sprintf("%s://%s%s",
api.Scheme, node, pingPath)
if code, body, e = api.HTTPClient.Get(url, api.Timeout); e != nil {
return "", e
}
if !httputils.HTTPStatusOk(code) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the StatusOK(200) the only case to judge whether a node could be pinged?

I don't think we should use HTTP protocol (which is level 7) to test the connection.

return "", fmt.Errorf("%d:%s", code, body)
}
return string(body), e
}

// Register sends a request to the supernode to register itself as a peer
// and create downloading task.
func (api *supernodeAPI) Register(node string, req *types.RegisterRequest) (
Expand Down
5 changes: 5 additions & 0 deletions dfget/core/api/supernode_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ func init() {

// ----------------------------------------------------------------------------
// unit tests for SupernodeAPI
func (s *SupernodeAPITestSuite) TestSupernodeAPI_Ping(c *check.C) {
r, e := s.api.Ping(localhost)
c.Check(r, check.Equals, "")
c.Check(e.Error(), check.Equals, "0:")
}

func (s *SupernodeAPITestSuite) TestSupernodeAPI_Register(c *check.C) {
s.mock.PostJSONFunc = s.mock.CreatePostJSONFunc(0, nil, nil)
Expand Down
15 changes: 15 additions & 0 deletions dfget/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,8 +293,23 @@ func checkConnectSupernode(locator locator.SupernodeLocator) (localIP string) {
if locator == nil {
return ""
}
supernodeAPI := api.NewSupernodeAPI()
for _, group := range locator.All() {
for _, n := range group.Nodes {
// step 1. query supernode api get request ip, check if request ip in local eth IPs
if localIP, e = supernodeAPI.Ping(n.String()); e == nil {
logrus.Infof("Connect to supernode get self ip:%s", localIP)
if localIPs, err := netutils.GetAllIPs(); err == nil {
for _, lip := range localIPs {
if localIP == lip {
return localIP
}
}
}
} else {
logrus.Warnf("Connect to supernode:%s error: %v", n, e)
}
// step 2. if request ip not in eth ips, it might behind NAT, try use TCP conn to get localIP
if localIP, e = httputils.CheckConnect(n.IP, n.Port, 1000); e == nil {
return localIP
}
Expand Down
11 changes: 11 additions & 0 deletions dfget/core/helper/test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ func CreateRandomString(cap int) string {
// ----------------------------------------------------------------------------
// MockSupernodeAPI

type PingFuncType func(ip string) (string, error)

// RegisterFuncType function type of SupernodeAPI#Register
type RegisterFuncType func(ip string, req *types.RegisterRequest) (*types.RegisterResponse, error)

Expand All @@ -123,6 +125,7 @@ type ReportMetricsFuncType func(node string, req *api_types.TaskMetricsRequest)

// MockSupernodeAPI mocks the SupernodeAPI.
type MockSupernodeAPI struct {
PingFunc PingFuncType
RegisterFunc RegisterFuncType
PullFunc PullFuncType
ReportFunc ReportFuncType
Expand All @@ -133,6 +136,14 @@ type MockSupernodeAPI struct {

var _ api.SupernodeAPI = &MockSupernodeAPI{}

// Ping implements SupernodeAPI#Ping.
func (m *MockSupernodeAPI) Ping(node string) (reqIP string, err error) {
if m.PingFunc != nil {
return m.PingFunc(node)
}
return "127.0.0.1", nil
}

// Register implements SupernodeAPI#Register.
func (m *MockSupernodeAPI) Register(ip string, req *types.RegisterRequest) (
*types.RegisterResponse, error) {
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ require (
github.com/magiconair/properties v1.8.1 // indirect
github.com/mailru/easyjson v0.0.0-20170902151237-2a92e673c9a6 // indirect
github.com/mitchellh/mapstructure v1.1.2
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
github.com/openacid/low v0.1.10
github.com/pborman/uuid v0.0.0-20180122190007-c65b2f87fee3
github.com/pkg/errors v0.8.0
Expand All @@ -45,6 +46,7 @@ require (
github.com/willf/bitset v0.0.0-20190228212526-18bd95f470f9
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550 // indirect
golang.org/x/net v0.0.0-20190620200207-3b0461eec859 // indirect
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect
gopkg.in/gcfg.v1 v1.2.3
gopkg.in/mgo.v2 v2.0.0-20160818020120-3f83fa500528 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5
github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQzvN1EDeE=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
github.com/openacid/errors v0.8.1/go.mod h1:GUQEJJOJE3W9skHm8E8Y4phdl2LLEN8iD7c5gcGgdx0=
github.com/openacid/low v0.1.10 h1:rKpmB5CHtKoPq9tFiqUvRk8vtWaPympL2D2dNfw3PvI=
Expand Down Expand Up @@ -236,6 +238,8 @@ gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLks
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/gcfg.v1 v1.2.3 h1:m8OOJ4ccYHnx2f4gQwpno8nAX5OGOh7RLaaz0pj3Ogs=
gopkg.in/gcfg.v1 v1.2.3/go.mod h1:yesOnuUOFQAhST5vPY4nbZsb/huCgGGXlipJsBn0b3o=
gopkg.in/mgo.v2 v2.0.0-20160818020120-3f83fa500528 h1:/saqWwm73dLmuzbNhe92F0QsZ/KiFND+esHco2v1hiY=
Expand Down
7 changes: 6 additions & 1 deletion supernode/server/system_bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,15 @@ package server
import (
"context"
"net/http"
"strings"
)

func (s *Server) ping(context context.Context, rw http.ResponseWriter, req *http.Request) (err error) {
remoteIP := ""
if idx := strings.LastIndexByte(req.RemoteAddr, ':'); idx >= 0 {
remoteIP = req.RemoteAddr[:idx]
}
rw.WriteHeader(http.StatusOK)
_, err = rw.Write([]byte{'O', 'K'})
_, err = rw.Write([]byte(remoteIP))
return err
}
4 changes: 2 additions & 2 deletions test/api_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ func (s *APIMetricsSuite) TestHttpMetrics(c *check.C) {
// Get httpRequest counter value equals 1.
CheckMetric(c, fmt.Sprintf(requestCounter, 200, "/_ping"), 1)

// Get httpResponse size sum value equals 2.
CheckMetric(c, fmt.Sprintf(responseSizeSum, "/_ping"), 2)
// Get httpResponse size sum value equals 9(127.0.0.1).
CheckMetric(c, fmt.Sprintf(responseSizeSum, "/_ping"), 9)

// Get httpResponse size count value equals 1.
CheckMetric(c, fmt.Sprintf(responseSizeCount, "/_ping"), 1)
Expand Down