Skip to content
This repository was archived by the owner on Dec 20, 2024. It is now read-only.

Commit 18b652c

Browse files
authored
Merge pull request #1264 from antsystem/feat/extract-dfget-lib-prepare-pr
add stream mode which do not up dfget progress.
2 parents aa709f6 + ce27e84 commit 18b652c

File tree

10 files changed

+194
-38
lines changed

10 files changed

+194
-38
lines changed

ROADMAP.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ We will try to finish it before **December 30, 2019**.
7070
As a cloud native project, we should do more work to support deploy the `Dragonfly` on the kubernets platform. Including but not limited to the following list:
7171

7272
* Deploy `supernode` using [Helm](https://github.com/helm/helm) in Kubernetes to simplify the complexity of scaling SuperNodes in Kubernetes.
73-
* Deploy `supernode` cluster using [Operator](https://coreos.com/operators/).
73+
* Deploy `supernode` cluster using Operator.
7474
* Deploy `dfget & dfdaemon` using DaemonSet in Kubernetes.
7575

7676
Related issue: [#346](https://github.com/dragonflyoss/Dragonfly/issues/346)

cmd/dfdaemon/app/init.go

Lines changed: 47 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,23 +28,58 @@ import (
2828

2929
"github.com/dragonflyoss/Dragonfly/dfdaemon/config"
3030
"github.com/dragonflyoss/Dragonfly/dfdaemon/constant"
31+
dfgetcfg "github.com/dragonflyoss/Dragonfly/dfget/config"
32+
"github.com/dragonflyoss/Dragonfly/dfget/util"
3133
"github.com/dragonflyoss/Dragonfly/pkg/dflog"
3234
"github.com/dragonflyoss/Dragonfly/pkg/errortypes"
3335
"github.com/dragonflyoss/Dragonfly/pkg/fileutils"
36+
"github.com/dragonflyoss/Dragonfly/pkg/httputils"
37+
"github.com/dragonflyoss/Dragonfly/pkg/netutils"
3438
statutil "github.com/dragonflyoss/Dragonfly/pkg/stat"
39+
"github.com/dragonflyoss/Dragonfly/pkg/stringutils"
3540

3641
"github.com/pkg/errors"
3742
"github.com/sirupsen/logrus"
3843
)
3944

45+
// adjustSupernodeList adjusts the super nodes [a,b] to [a,b,b,a]
46+
func adjustSupernodeList(nodes []string) []string {
47+
switch nodesLen := len(nodes); nodesLen {
48+
case 0:
49+
return nodes
50+
case 1:
51+
return append(nodes, nodes[0])
52+
default:
53+
util.Shuffle(nodesLen, func(i, j int) {
54+
nodes[i], nodes[j] = nodes[j], nodes[i]
55+
})
56+
return append(nodes, nodes...)
57+
}
58+
}
59+
60+
// getLocalIP return the localIP which connects to supper node
61+
func getLocalIP(nodes []string) (localIP string) {
62+
var (
63+
e error
64+
)
65+
for _, n := range nodes {
66+
ip, port := netutils.GetIPAndPortFromNode(n, dfgetcfg.DefaultSupernodePort)
67+
if localIP, e = httputils.CheckConnect(ip, port, 1000); e == nil {
68+
return localIP
69+
}
70+
logrus.Warnf("Connect to node:%s error: %v", n, e)
71+
}
72+
return ""
73+
}
74+
4075
// initDfdaemon sets up running environment for dfdaemon according to the given config.
41-
func initDfdaemon(cfg config.Properties) error {
76+
func initDfdaemon(cfg *config.Properties) error {
4277
// if Options.MaxProcs <= 0, programs run with GOMAXPROCS set to the number of cores available.
4378
if cfg.MaxProcs > 0 {
4479
runtime.GOMAXPROCS(cfg.MaxProcs)
4580
}
4681

47-
if err := initLogger(cfg); err != nil {
82+
if err := initLogger(*cfg); err != nil {
4883
return errors.Wrap(err, "init logger")
4984
}
5085

@@ -58,14 +93,20 @@ func initDfdaemon(cfg config.Properties) error {
5893
"ensure local repo %s exists", cfg.DFRepo,
5994
)
6095
}
96+
cfg.SuperNodes = adjustSupernodeList(cfg.SuperNodes)
97+
if stringutils.IsEmptyStr(cfg.LocalIP) {
98+
cfg.LocalIP = getLocalIP(cfg.SuperNodes)
99+
}
61100

62101
go cleanLocalRepo(cfg.DFRepo)
63102

64-
dfgetVersion, err := exec.Command(cfg.DFPath, "version").CombinedOutput()
65-
if err != nil {
66-
return errors.Wrap(err, "get dfget version")
103+
if !cfg.StreamMode {
104+
dfgetVersion, err := exec.Command(cfg.DFPath, "version").CombinedOutput()
105+
if err != nil {
106+
return errors.Wrap(err, "get dfget version")
107+
}
108+
logrus.Infof("use %s from %s", bytes.TrimSpace(dfgetVersion), cfg.DFPath)
67109
}
68-
logrus.Infof("use %s from %s", bytes.TrimSpace(dfgetVersion), cfg.DFPath)
69110

70111
return nil
71112
}

cmd/dfdaemon/app/root.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ var rootCmd = &cobra.Command{
6161
return errors.Wrap(err, "get config from viper")
6262
}
6363

64-
if err := initDfdaemon(*cfg); err != nil {
64+
if err := initDfdaemon(cfg); err != nil {
6565
return errors.Wrap(err, "init dfdaemon")
6666
}
6767

@@ -72,6 +72,10 @@ var rootCmd = &cobra.Command{
7272
if err != nil {
7373
return errors.Wrap(err, "create dfdaemon from config")
7474
}
75+
// if stream mode, launch peer server in dfdaemon progress
76+
if cfg.StreamMode {
77+
go dfdaemon.LaunchPeerServer(*cfg)
78+
}
7579
return s.Start()
7680
},
7781
}
@@ -93,6 +97,8 @@ func init() {
9397
// http server config
9498
rf.String("hostIp", "127.0.0.1", "dfdaemon host ip, default: 127.0.0.1")
9599
rf.Uint("port", 65001, "dfdaemon will listen the port")
100+
rf.Uint("peerPort", 0, "peerserver will listen the port")
101+
rf.Bool("streamMode", false, "dfdaemon will run in stream mode")
96102
rf.String("certpem", "", "cert.pem file path")
97103
rf.String("keypem", "", "key.pem file path")
98104

dfdaemon/config/config.go

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import (
2121
"crypto/x509"
2222
"encoding/json"
2323
"net/url"
24-
"os"
2524
"path/filepath"
2625
"regexp"
2726

@@ -111,7 +110,10 @@ type Properties struct {
111110
DFRepo string `yaml:"localrepo" json:"localrepo"`
112111
DFPath string `yaml:"dfpath" json:"dfpath"`
113112

114-
LogConfig dflog.LogConfig `yaml:"logConfig" json:"logConfig"`
113+
LogConfig dflog.LogConfig `yaml:"logConfig" json:"logConfig"`
114+
LocalIP string `yaml:"localIP" json:"localIP"`
115+
PeerPort int `yaml:"peerPort" json:"peerPort"`
116+
StreamMode bool `yaml:"streamMode" json:"streamMode"`
115117
}
116118

117119
// Validate validates the config
@@ -130,13 +132,6 @@ func (p *Properties) Validate() error {
130132
)
131133
}
132134

133-
if _, err := os.Stat(p.DFPath); err != nil && os.IsNotExist(err) {
134-
return dferr.Newf(
135-
constant.CodeExitDfgetNotFound,
136-
"dfpath %s not found", p.DFPath,
137-
)
138-
}
139-
140135
return nil
141136
}
142137

@@ -156,6 +151,8 @@ func (p *Properties) DFGetConfig() DFGetConfig {
156151
RateLimit: p.RateLimit.String(),
157152
DFRepo: p.DFRepo,
158153
DFPath: p.DFPath,
154+
LocalIP: p.LocalIP,
155+
PeerPort: p.PeerPort,
159156
}
160157
if p.HijackHTTPS != nil {
161158
dfgetConfig.HostsConfig = p.HijackHTTPS.Hosts
@@ -181,6 +178,8 @@ type DFGetConfig struct {
181178
DFRepo string `yaml:"localrepo"`
182179
DFPath string `yaml:"dfpath"`
183180
HostsConfig []*HijackHost `yaml:"hosts" json:"hosts"`
181+
PeerPort int `yaml:"peerPort"`
182+
LocalIP string `yaml:"localIP"`
184183
}
185184

186185
// RegistryMirror configures the mirror of the official docker registry

dfdaemon/config/config_test.go

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,9 @@ import (
2020
"crypto/x509"
2121
"encoding/json"
2222
"fmt"
23-
"math/rand"
2423
"os"
2524
"strings"
2625
"testing"
27-
"time"
2826

2927
"github.com/dragonflyoss/Dragonfly/dfdaemon/constant"
3028
"github.com/dragonflyoss/Dragonfly/pkg/errortypes"
@@ -85,17 +83,6 @@ func (ts *configTestSuite) TestValidateDFRepo() {
8583
r.Equal(constant.CodeExitPathNotAbs, getCode(c.Validate()))
8684
}
8785

88-
func (ts *configTestSuite) TestValidateDFPath() {
89-
c := defaultConfig()
90-
r := ts.Require()
91-
92-
c.DFPath = "/"
93-
r.Nil(c.Validate())
94-
95-
c.DFPath = fmt.Sprintf("/df-test-%d-%d", time.Now().UnixNano(), rand.Int())
96-
r.Equal(constant.CodeExitDfgetNotFound, getCode(c.Validate()))
97-
}
98-
9986
func (ts *configTestSuite) TestURLNew() {
10087
r := ts.Require()
10188

dfdaemon/downloader/downloader.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,10 @@
1616

1717
package downloader
1818

19-
import "context"
19+
import (
20+
"context"
21+
"io"
22+
)
2023

2124
// Interface specifies on how an plugin can download a file.
2225
type Interface interface {
@@ -25,5 +28,12 @@ type Interface interface {
2528
DownloadContext(ctx context.Context, url string, header map[string][]string, name string) (string, error)
2629
}
2730

31+
type Stream interface {
32+
// DownloadContext downloads the resource as specified in url, and it accepts
33+
// a context parameter so that it can handle timeouts correctly.
34+
DownloadStreamContext(ctx context.Context, url string, header map[string][]string, name string) (io.Reader, error)
35+
}
36+
2837
// Factory is a function that returns a new downloader.
2938
type Factory func() Interface
39+
type StreamFactory func() Stream
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Copyright The Dragonfly Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package p2p
18+
19+
import (
20+
"context"
21+
"errors"
22+
"io"
23+
24+
"github.com/dragonflyoss/Dragonfly/dfdaemon/config"
25+
)
26+
27+
type Client struct {
28+
}
29+
30+
func (c *Client) DownloadContext(ctx context.Context, url string, header map[string][]string, name string) (string, error) {
31+
return "", errors.New("Not Implementation")
32+
}
33+
34+
func (c *Client) DownloadStreamContext(ctx context.Context, url string, header map[string][]string, name string) (io.Reader, error) {
35+
return nil, errors.New("Not Implementation")
36+
}
37+
38+
func NewClient(cfg config.DFGetConfig) *Client {
39+
return &Client{}
40+
}

dfdaemon/proxy/proxy.go

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"github.com/dragonflyoss/Dragonfly/dfdaemon/config"
3232
"github.com/dragonflyoss/Dragonfly/dfdaemon/downloader"
3333
"github.com/dragonflyoss/Dragonfly/dfdaemon/downloader/dfget"
34+
"github.com/dragonflyoss/Dragonfly/dfdaemon/downloader/p2p"
3435
"github.com/dragonflyoss/Dragonfly/dfdaemon/transport"
3536
"github.com/golang/groupcache/lru"
3637
"github.com/pkg/errors"
@@ -107,6 +108,20 @@ func WithDownloaderFactory(f downloader.Factory) Option {
107108
}
108109
}
109110

111+
func WithStreamMode(streamMode bool) Option {
112+
return func(p *Proxy) error {
113+
p.streamMode = streamMode
114+
return nil
115+
}
116+
}
117+
118+
func WithStreamDownloaderFactory(f downloader.StreamFactory) Option {
119+
return func(p *Proxy) error {
120+
p.streamDownloadFactory = f
121+
return nil
122+
}
123+
}
124+
110125
// New returns a new transparent proxy with the given rules
111126
func New(opts ...Option) (*Proxy, error) {
112127
proxy := &Proxy{
@@ -130,6 +145,10 @@ func NewFromConfig(c config.Properties) (*Proxy, error) {
130145
WithDownloaderFactory(func() downloader.Interface {
131146
return dfget.NewGetter(c.DFGetConfig())
132147
}),
148+
WithStreamDownloaderFactory(func() downloader.Stream {
149+
return p2p.NewClient(c.DFGetConfig())
150+
}),
151+
WithStreamMode(c.StreamMode),
133152
}
134153

135154
logrus.Infof("registry mirror: %s", c.RegistryMirror.Remote)
@@ -179,7 +198,9 @@ type Proxy struct {
179198
// directHandler are used to handle non proxy requests
180199
directHandler http.Handler
181200
// downloadFactory returns the downloader used for p2p downloading
182-
downloadFactory downloader.Factory
201+
downloadFactory downloader.Factory
202+
streamDownloadFactory downloader.StreamFactory
203+
streamMode bool
183204
}
184205

185206
func (proxy *Proxy) mirrorRegistry(w http.ResponseWriter, r *http.Request) {
@@ -247,7 +268,7 @@ func (proxy *Proxy) handleHTTP(w http.ResponseWriter, req *http.Request) {
247268

248269
func (proxy *Proxy) roundTripper(tlsConfig *tls.Config) http.RoundTripper {
249270
rt, _ := transport.New(
250-
transport.WithDownloader(proxy.downloadFactory()),
271+
transport.WithStreamDownloader(proxy.streamDownloadFactory()),
251272
transport.WithTLS(tlsConfig),
252273
transport.WithCondition(proxy.shouldUseDfget),
253274
)

dfdaemon/server.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ import (
2525
"github.com/dragonflyoss/Dragonfly/dfdaemon/config"
2626
"github.com/dragonflyoss/Dragonfly/dfdaemon/handler"
2727
"github.com/dragonflyoss/Dragonfly/dfdaemon/proxy"
28+
dfgetConfig "github.com/dragonflyoss/Dragonfly/dfget/config"
29+
"github.com/dragonflyoss/Dragonfly/dfget/core/uploader"
2830
"github.com/dragonflyoss/Dragonfly/version"
2931

3032
"github.com/pkg/errors"
@@ -115,6 +117,19 @@ func NewFromConfig(cfg config.Properties) (*Server, error) {
115117
return New(opts...)
116118
}
117119

120+
func LaunchPeerServer(cfg config.Properties) error {
121+
peerServerConfig := dfgetConfig.NewConfig()
122+
peerServerConfig.RV.LocalIP = cfg.LocalIP
123+
peerServerConfig.RV.PeerPort = cfg.PeerPort
124+
peerServerConfig.RV.ServerAliveTime = 0
125+
port, err := uploader.LaunchPeerServer(peerServerConfig)
126+
if err != nil {
127+
return err
128+
}
129+
peerServerConfig.RV.PeerPort = port
130+
return nil
131+
}
132+
118133
// Start runs dfdaemon's http server.
119134
func (s *Server) Start() error {
120135
var err error

0 commit comments

Comments
 (0)