Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/capcli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,7 @@ func (d *DownloadSnapshots) Run(ctx *Context) error {
if err != nil {
return err
}
downlo, err := downloader.New(ctx, downloaderCfg, dirs, log.Root(), log.LvlInfo, true)
downlo, err := downloader.New(ctx, downloaderCfg, log.Root(), log.LvlInfo, true)
if err != nil {
return err
}
Expand Down
146 changes: 128 additions & 18 deletions cmd/downloader/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@ import (
"context"
"errors"
"fmt"
"io/fs"
"net"
"net/url"
"os"
"path/filepath"
"sort"
"strings"
"time"

Expand Down Expand Up @@ -109,9 +112,18 @@ func init() {
rootCmd.AddCommand(torrentCat)
rootCmd.AddCommand(torrentMagnet)

withDataDir(torrentClean)
rootCmd.AddCommand(torrentClean)

withDataDir(manifestCmd)
withChainFlag(manifestCmd)
rootCmd.AddCommand(manifestCmd)

manifestVerifyCmd.Flags().StringVar(&webseeds, utils.WebSeedsFlag.Name, utils.WebSeedsFlag.Value, utils.WebSeedsFlag.Usage)
manifestVerifyCmd.PersistentFlags().BoolVar(&verifyFailfast, "verify.failfast", false, "Stop on first found error. Report it and exit")
withChainFlag(manifestVerifyCmd)
rootCmd.AddCommand(manifestVerifyCmd)

withDataDir(printTorrentHashes)
withChainFlag(printTorrentHashes)
printTorrentHashes.PersistentFlags().BoolVar(&forceRebuild, "rebuild", false, "Force re-create .torrent files")
Expand Down Expand Up @@ -216,7 +228,7 @@ func Downloader(ctx context.Context, logger log.Logger) error {

cfg.AddTorrentsFromDisk = true // always true unless using uploader - which wants control of torrent files

d, err := downloader.New(ctx, cfg, dirs, logger, log.LvlInfo, seedbox)
d, err := downloader.New(ctx, cfg, logger, log.LvlInfo, seedbox)
if err != nil {
return err
}
Expand Down Expand Up @@ -276,7 +288,7 @@ var printTorrentHashes = &cobra.Command{

var manifestCmd = &cobra.Command{
Use: "manifest",
Example: "go run ./cmd/downloader torrent_hashes --datadir <your_datadir>",
Example: "go run ./cmd/downloader manifest --datadir <your_datadir>",
RunE: func(cmd *cobra.Command, args []string) error {
logger := debug.SetupCobra(cmd, "downloader")
if err := manifest(cmd.Context(), logger); err != nil {
Expand All @@ -286,6 +298,18 @@ var manifestCmd = &cobra.Command{
},
}

var manifestVerifyCmd = &cobra.Command{
Use: "manifest-verify",
Example: "go run ./cmd/downloader manifest-verify --chain <chain> [--webseeds 'a','b','c']",
RunE: func(cmd *cobra.Command, args []string) error {
logger := debug.SetupCobra(cmd, "downloader")
if err := manifestVerify(cmd.Context(), logger); err != nil {
log.Error(err.Error())
}
return nil
},
}

var torrentCat = &cobra.Command{
Use: "torrent_cat",
Example: "go run ./cmd/downloader torrent_cat <path_to_torrent_file>",
Expand All @@ -308,6 +332,44 @@ var torrentCat = &cobra.Command{
return nil
},
}
var torrentClean = &cobra.Command{
Use: "torrent_clean",
Short: "Remove all .torrent files from datadir directory",
Example: "go run ./cmd/downloader torrent_clean --datadir=<datadir>",
RunE: func(cmd *cobra.Command, args []string) error {
dirs := datadir.New(datadirCli)

logger.Info("[snapshots.webseed] processing local file etags")
removedTorrents := 0
walker := func(path string, de fs.DirEntry, err error) error {
if err != nil || de.IsDir() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think you can get de nil ptr here - because err is not nil

if err != nil {
logger.Warn("[snapshots.torrent] walk and cleanup", "err", err, "path", path)
}
return nil //nolint
}

if !strings.HasSuffix(de.Name(), ".torrent") || strings.HasPrefix(de.Name(), ".") {
return nil
}
err = os.Remove(filepath.Join(dirs.Snap, path))
if err != nil {
logger.Warn("[snapshots.torrent] remove", "err", err, "path", path)
return err
}
removedTorrents++
return nil
}

sfs := os.DirFS(dirs.Snap)
if err := fs.WalkDir(sfs, ".", walker); err != nil {
return err
}
logger.Info("[snapshots.torrent] cleanup finished", "count", removedTorrents)
return nil
},
}

var torrentMagnet = &cobra.Command{
Use: "torrent_magnet",
Example: "go run ./cmd/downloader torrent_magnet <path_to_torrent_file>",
Expand All @@ -325,49 +387,97 @@ var torrentMagnet = &cobra.Command{
},
}

func manifestVerify(ctx context.Context, logger log.Logger) error {
webseedsList := common.CliString2Array(webseeds)
if known, ok := snapcfg.KnownWebseeds[chain]; ok {
webseedsList = append(webseedsList, known...)
}

webseedUrlsOrFiles := webseedsList
webseedHttpProviders := make([]*url.URL, 0, len(webseedUrlsOrFiles))
webseedFileProviders := make([]string, 0, len(webseedUrlsOrFiles))
for _, webseed := range webseedUrlsOrFiles {
if !strings.HasPrefix(webseed, "v") { // has marker v1/v2/...
uri, err := url.ParseRequestURI(webseed)
if err != nil {
if strings.HasSuffix(webseed, ".toml") && dir.FileExist(webseed) {
webseedFileProviders = append(webseedFileProviders, webseed)
}
continue
}
webseedHttpProviders = append(webseedHttpProviders, uri)
continue
}

if strings.HasPrefix(webseed, "v1:") {
withoutVerisonPrefix := webseed[3:]
if !strings.HasPrefix(withoutVerisonPrefix, "https:") {
continue
}
uri, err := url.ParseRequestURI(withoutVerisonPrefix)
if err != nil {
log.Warn("[webseed] can't parse url", "err", err, "url", withoutVerisonPrefix)
continue
}
webseedHttpProviders = append(webseedHttpProviders, uri)
} else {
continue
}
}

_ = webseedFileProviders // todo add support of file providers
logger.Warn("file providers are not supported yet", "fileProviders", webseedFileProviders)

wseed := downloader.NewWebSeeds(webseedHttpProviders, log.LvlDebug, logger)
return wseed.VerifyManifestedBuckets(ctx, verifyFailfast)
}

func manifest(ctx context.Context, logger log.Logger) error {
dirs := datadir.New(datadirCli)

files, err := downloader.SeedableFiles(dirs, chain)
if err != nil {
return err
}

extList := []string{
".torrent",
".seg", ".idx", // e2
".kv", ".kvi", ".bt", ".kvei", // e3 domain
".v", ".vi", //e3 hist
".ef", ".efi", //e3 idx
".txt", //salt.txt
//".seg", ".idx", // e2
//".kv", ".kvi", ".bt", ".kvei", // e3 domain
//".v", ".vi", //e3 hist
//".ef", ".efi", //e3 idx
".txt", //salt.txt, manifest.txt
}
l, _ := dir.ListFiles(dirs.Snap, extList...)
for _, fPath := range l {
_, fName := filepath.Split(fPath)
fmt.Printf("%s\n", fName)
files = append(files, fName)
}
l, _ = dir.ListFiles(dirs.SnapDomain, extList...)
for _, fPath := range l {
_, fName := filepath.Split(fPath)
fmt.Printf("domain/%s\n", fName)
files = append(files, "domain/"+fName)
}
l, _ = dir.ListFiles(dirs.SnapHistory, extList...)
for _, fPath := range l {
_, fName := filepath.Split(fPath)
if strings.Contains(fName, "commitment") {
continue
}
fmt.Printf("history/%s\n", fName)
files = append(files, "history/"+fName)
}
l, _ = dir.ListFiles(dirs.SnapIdx, extList...)
for _, fPath := range l {
_, fName := filepath.Split(fPath)
if strings.Contains(fName, "commitment") {
continue
}
fmt.Printf("idx/%s\n", fName)
files = append(files, "idx/"+fName)
}
l, _ = dir.ListFiles(dirs.SnapAccessors, extList...)
for _, fPath := range l {
_, fName := filepath.Split(fPath)
if strings.Contains(fName, "commitment") {
continue
}
fmt.Printf("accessors/%s\n", fName)

sort.Strings(files)
for _, f := range files {
fmt.Printf("%s\n", f)
}
return nil
}
Expand Down
8 changes: 8 additions & 0 deletions cmd/downloader/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,14 @@ downloader --datadir=<your> --chain=mainnet --webseed=<webseed_url>
downloader torrent_cat /path/to.torrent

downloader torrent_magnet /path/to.torrent

downloader torrent_clean --datadir <datadir> # remote all .torrent files in datadir
```

## Remote manifest verify
To check that remote webseeds has available manifest and all manifested files are available, has correct format of ETag, does not have dangling torrents etc.
```
downloader manifest-verify --chain <chain> [--webseeds 'a','b','c']
```

## Faster rsync
Expand Down
58 changes: 14 additions & 44 deletions erigon-lib/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ type Downloader struct {
type webDownloadInfo struct {
url *url.URL
length int64
md5 string
torrent *torrent.Torrent
}

Expand All @@ -118,7 +119,7 @@ type AggStats struct {
LocalFileHashTime time.Duration
}

func New(ctx context.Context, cfg *downloadercfg.Cfg, dirs datadir.Dirs, logger log.Logger, verbosity log.Lvl, discover bool) (*Downloader, error) {
func New(ctx context.Context, cfg *downloadercfg.Cfg, logger log.Logger, verbosity log.Lvl, discover bool) (*Downloader, error) {
db, c, m, torrentClient, err := openClient(ctx, cfg.Dirs.Downloader, cfg.Dirs.Snap, cfg.ClientConfig)
if err != nil {
return nil, fmt.Errorf("openClient: %w", err)
Expand Down Expand Up @@ -151,7 +152,7 @@ func New(ctx context.Context, cfg *downloadercfg.Cfg, dirs datadir.Dirs, logger
torrentClient: torrentClient,
lock: mutex,
stats: stats,
webseeds: &WebSeeds{logger: logger, verbosity: verbosity, downloadTorrentFile: cfg.DownloadTorrentFilesFromWebseed, torrentsWhitelist: lock.Downloads},
webseeds: NewWebSeeds(cfg.WebSeedUrls, verbosity, logger),
logger: logger,
verbosity: verbosity,
torrentFiles: &TorrentFiles{dir: cfg.Dirs.Snap},
Expand All @@ -161,13 +162,13 @@ func New(ctx context.Context, cfg *downloadercfg.Cfg, dirs datadir.Dirs, logger
downloading: map[string]struct{}{},
webseedsDiscover: discover,
}
d.webseeds.SetTorrent(d.torrentFiles, lock.Downloads, cfg.DownloadTorrentFilesFromWebseed)

if cfg.ClientConfig.DownloadRateLimiter != nil {
downloadLimit := cfg.ClientConfig.DownloadRateLimiter.Limit()
d.downloadLimit = &downloadLimit
}

d.webseeds.torrentFiles = d.torrentFiles
d.ctx, d.stopMainLoop = context.WithCancel(ctx)

if cfg.AddTorrentsFromDisk {
Expand Down Expand Up @@ -342,7 +343,7 @@ func initSnapshotLock(ctx context.Context, cfg *downloadercfg.Cfg, db kv.RoDB, s
Chain: cfg.ChainName,
}

files, err := seedableFiles(cfg.Dirs, cfg.ChainName)
files, err := SeedableFiles(cfg.Dirs, cfg.ChainName)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -656,7 +657,8 @@ func (d *Downloader) mainLoop(silent bool) error {
d.wg.Add(1)
go func() {
defer d.wg.Done()
d.webseeds.Discover(d.ctx, d.cfg.WebSeedUrls, d.cfg.WebSeedFiles, d.cfg.Dirs.Snap)
// webseeds.Discover may create new .torrent files on disk
d.webseeds.Discover(d.ctx, d.cfg.WebSeedFiles, d.cfg.Dirs.Snap)
// apply webseeds to existing torrents
if err := d.addTorrentFilesFromDisk(true); err != nil && !errors.Is(err, context.Canceled) {
d.logger.Warn("[snapshots] addTorrentFilesFromDisk", "err", err)
Expand Down Expand Up @@ -1272,8 +1274,6 @@ func (d *Downloader) checkComplete(name string) (bool, int64, *time.Time) {
}

func (d *Downloader) getWebDownloadInfo(t *torrent.Torrent) (webDownloadInfo, []*seedHash, error) {
torrentHash := t.InfoHash()

d.lock.RLock()
info, ok := d.webDownloadInfo[t.Name()]
d.lock.RUnlock()
Expand All @@ -1282,46 +1282,16 @@ func (d *Downloader) getWebDownloadInfo(t *torrent.Torrent) (webDownloadInfo, []
return info, nil, nil
}

seedHashMismatches := make([]*seedHash, 0, len(d.cfg.WebSeedUrls))

for _, webseed := range d.cfg.WebSeedUrls {
downloadUrl := webseed.JoinPath(t.Name())

if headRequest, err := http.NewRequestWithContext(d.ctx, "HEAD", downloadUrl.String(), nil); err == nil {
headResponse, err := http.DefaultClient.Do(headRequest)

if err != nil {
continue
}

headResponse.Body.Close()

if headResponse.StatusCode == http.StatusOK {
if meta, err := getWebpeerTorrentInfo(d.ctx, downloadUrl); err == nil {
if bytes.Equal(torrentHash.Bytes(), meta.HashInfoBytes().Bytes()) {
// TODO check the torrent's hash matches this hash
return webDownloadInfo{
url: downloadUrl,
length: headResponse.ContentLength,
torrent: t,
}, seedHashMismatches, nil
} else {
hash := meta.HashInfoBytes()
seedHashMismatches = append(seedHashMismatches, &seedHash{url: webseed, hash: &hash})
continue
}
}
}
}

seedHashMismatches = append(seedHashMismatches, &seedHash{url: webseed})
// todo this function does not exit on first matched webseed hash, could make unexpected results
infos, seedHashMismatches, err := d.webseeds.getWebDownloadInfo(d.ctx, t)
if err != nil || len(infos) == 0 {
return webDownloadInfo{}, seedHashMismatches, fmt.Errorf("can't find download info: %w", err)
}

return webDownloadInfo{}, seedHashMismatches, fmt.Errorf("can't find download info")
return infos[0], seedHashMismatches, nil
}

func getWebpeerTorrentInfo(ctx context.Context, downloadUrl *url.URL) (*metainfo.MetaInfo, error) {
torrentRequest, err := http.NewRequestWithContext(ctx, "GET", downloadUrl.String()+".torrent", nil)
torrentRequest, err := http.NewRequestWithContext(ctx, http.MethodGet, downloadUrl.String()+".torrent", nil)

if err != nil {
return nil, err
Expand Down Expand Up @@ -2215,7 +2185,7 @@ func (d *Downloader) AddMagnetLink(ctx context.Context, infoHash metainfo.Hash,
return nil
}

func seedableFiles(dirs datadir.Dirs, chainName string) ([]string, error) {
func SeedableFiles(dirs datadir.Dirs, chainName string) ([]string, error) {
files, err := seedableSegmentFiles(dirs.Snap, chainName)
if err != nil {
return nil, fmt.Errorf("seedableSegmentFiles: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion erigon-lib/downloader/downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func TestChangeInfoHashOfSameFile(t *testing.T) {
dirs := datadir.New(t.TempDir())
cfg, err := downloadercfg2.New(dirs, "", lg.Info, 0, 0, 0, 0, 0, nil, nil, "testnet", false)
require.NoError(err)
d, err := New(context.Background(), cfg, dirs, log.New(), log.LvlInfo, true)
d, err := New(context.Background(), cfg, log.New(), log.LvlInfo, true)
require.NoError(err)
defer d.Close()
err = d.AddMagnetLink(d.ctx, snaptype.Hex2InfoHash("aa"), "a.seg")
Expand Down
Loading