Skip to content

Commit 9b4bf2a

Browse files
committed
feat: rewrite dag stat to be awesomerer
Fixes #8794 Fixes #8791 This adds: - skip-raw-leaves option, that avoids downloading raw leaves when you already know their size in the parrent block - multithreaded download - properly count blocks that show up multiple times with different multicodec - do not count size of inlined blocks Speed: Way way faster (it's hard to make an objective number because that depends a lot on how much multithreading helps you and how many raw-leaves block you skip), in my case: `>100x` faster.
1 parent 04e7e95 commit 9b4bf2a

File tree

2 files changed

+153
-28
lines changed

2 files changed

+153
-28
lines changed

core/commands/dag/dag.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,11 @@ import (
1717
)
1818

1919
const (
20-
pinRootsOptionName = "pin-roots"
21-
progressOptionName = "progress"
22-
silentOptionName = "silent"
23-
statsOptionName = "stats"
20+
pinRootsOptionName = "pin-roots"
21+
progressOptionName = "progress"
22+
skipRawleavesOptionName = "skip-raw-leaves"
23+
silentOptionName = "silent"
24+
statsOptionName = "stats"
2425
)
2526

2627
// DagCmd provides a subset of commands for interacting with ipld dag objects
@@ -305,6 +306,7 @@ Note: This command skips duplicate blocks in reporting both size and the number
305306
},
306307
Options: []cmds.Option{
307308
cmds.BoolOption(progressOptionName, "p", "Return progressive data while reading through the DAG").WithDefault(true),
309+
cmds.BoolOption(skipRawleavesOptionName, "s", "Skip raw leaves block when size is already known, faster but can lead to underestimating the size with malformed DAGs.").WithDefault(true),
308310
},
309311
Run: dagStat,
310312
Type: DagStat{},

core/commands/dag/stat.go

Lines changed: 147 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,164 @@
11
package dagcmd
22

33
import (
4+
"context"
45
"fmt"
56
"io"
67
"os"
78

89
"github.com/ipfs/go-ipfs/core/commands/cmdenv"
910
"github.com/ipfs/go-ipfs/core/commands/e"
10-
"github.com/ipfs/go-merkledag/traverse"
1111
"github.com/ipfs/interface-go-ipfs-core/path"
1212

13+
cid "github.com/ipfs/go-cid"
1314
cmds "github.com/ipfs/go-ipfs-cmds"
15+
ipld "github.com/ipfs/go-ipld-format"
1416
mdag "github.com/ipfs/go-merkledag"
17+
mh "github.com/multiformats/go-multihash"
1518
)
1619

20+
// Implement traversal here because there is a lot of adhoc logic to take care off
21+
// This is an async (multithreaded downloads but with a single contention point for processing) DFS implementation
22+
type statTraversal struct {
23+
runners int
24+
stats DagStat
25+
26+
ctx context.Context
27+
cancel context.CancelFunc
28+
res cmds.ResponseEmitter
29+
getter ipld.NodeGetter
30+
nodes chan *ipld.NodeOption
31+
progressive bool
32+
skipRawleaves bool
33+
stated map[string]struct{}
34+
seen map[string]struct{}
35+
}
36+
37+
func newStatTraversal(ctx context.Context, getter ipld.NodeGetter, res cmds.ResponseEmitter, progressive bool, skipRawleaves bool) *statTraversal {
38+
ctx, cancel := context.WithCancel(ctx)
39+
return &statTraversal{
40+
ctx: ctx,
41+
cancel: cancel,
42+
getter: getter,
43+
res: res,
44+
nodes: make(chan *ipld.NodeOption),
45+
progressive: progressive,
46+
skipRawleaves: skipRawleaves,
47+
// Use two different maps to correctly coiunt blocks with matching multihashes but different codecs
48+
stated: make(map[string]struct{}),
49+
seen: make(map[string]struct{}),
50+
}
51+
}
52+
53+
func (t *statTraversal) pump() error {
54+
defer t.cancel()
55+
for {
56+
select {
57+
case <-t.ctx.Done():
58+
return t.ctx.Err()
59+
case n := <-t.nodes:
60+
if n.Err != nil {
61+
return n.Err
62+
}
63+
t.runners--
64+
err := t.handleStating(n.Node.Cid(), uint64(len(n.Node.RawData())))
65+
if err != nil {
66+
return err
67+
}
68+
err = t.handleRecursion(n.Node)
69+
if err != nil {
70+
return err
71+
}
72+
73+
if t.runners == 0 {
74+
// FINISHED !
75+
return nil
76+
}
77+
}
78+
}
79+
}
80+
81+
func (t *statTraversal) handleStating(c cid.Cid, nodeLen uint64) error {
82+
k := string(c.Hash())
83+
if _, alreadyCounted := t.stated[k]; alreadyCounted {
84+
return nil
85+
}
86+
t.stated[k] = struct{}{}
87+
88+
if c.Prefix().MhType != mh.IDENTITY { // Do not count the size of inlined blocks
89+
t.stats.Size += nodeLen
90+
}
91+
t.stats.NumBlocks++
92+
93+
if t.progressive {
94+
if err := t.res.Emit(&t.stats); err != nil {
95+
return err
96+
}
97+
}
98+
return nil
99+
}
100+
101+
func (t *statTraversal) handleRecursion(node ipld.Node) error {
102+
scan := make([]cid.Cid, 0, len(node.Links())) // Prealoc enough capacity
103+
for _, l := range node.Links() {
104+
k := l.Cid.KeyString()
105+
if _, alreadySeen := t.seen[k]; alreadySeen {
106+
continue
107+
}
108+
t.seen[k] = struct{}{}
109+
110+
if t.skipRawleaves {
111+
prefix := l.Cid.Prefix()
112+
if prefix.Codec == cid.Raw && l.Size != 0 /* still fetch links with likely missing size */ {
113+
err := t.handleStating(l.Cid, l.Size)
114+
if err != nil {
115+
return err
116+
}
117+
continue
118+
}
119+
}
120+
121+
scan = append(scan, l.Cid)
122+
}
123+
124+
t.runners += len(scan)
125+
go func() {
126+
c := t.getter.GetMany(t.ctx, scan)
127+
for {
128+
select {
129+
case <-t.ctx.Done():
130+
return
131+
case v, ok := <-c:
132+
if !ok {
133+
return
134+
}
135+
select {
136+
case <-t.ctx.Done():
137+
return
138+
case t.nodes <- v:
139+
}
140+
}
141+
}
142+
}()
143+
return nil
144+
}
145+
146+
func (t *statTraversal) traverse(c cid.Cid) error {
147+
t.seen[c.KeyString()] = struct{}{}
148+
t.runners = 1
149+
go func() {
150+
node, err := t.getter.Get(t.ctx, c)
151+
select {
152+
case <-t.ctx.Done():
153+
case t.nodes <- &ipld.NodeOption{Node: node, Err: err}:
154+
}
155+
}()
156+
return t.pump()
157+
}
158+
17159
func dagStat(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
18160
progressive := req.Options[progressOptionName].(bool)
161+
skipRawleaves := req.Options[skipRawleavesOptionName].(bool)
19162

20163
api, err := cmdenv.GetApi(env, req)
21164
if err != nil {
@@ -32,35 +175,15 @@ func dagStat(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment)
32175
}
33176

34177
nodeGetter := mdag.NewSession(req.Context, api.Dag())
35-
obj, err := nodeGetter.Get(req.Context, rp.Cid())
36-
if err != nil {
37-
return err
38-
}
39-
40-
dagstats := &DagStat{}
41-
err = traverse.Traverse(obj, traverse.Options{
42-
DAG: nodeGetter,
43-
Order: traverse.DFSPre,
44-
Func: func(current traverse.State) error {
45-
dagstats.Size += uint64(len(current.Node.RawData()))
46-
dagstats.NumBlocks++
178+
t := newStatTraversal(req.Context, nodeGetter, res, progressive, skipRawleaves)
47179

48-
if progressive {
49-
if err := res.Emit(dagstats); err != nil {
50-
return err
51-
}
52-
}
53-
return nil
54-
},
55-
ErrFunc: nil,
56-
SkipDuplicates: true,
57-
})
180+
err = t.traverse(rp.Cid())
58181
if err != nil {
59182
return fmt.Errorf("error traversing DAG: %w", err)
60183
}
61184

62185
if !progressive {
63-
if err := res.Emit(dagstats); err != nil {
186+
if err := res.Emit(&t.stats); err != nil {
64187
return err
65188
}
66189
}

0 commit comments

Comments
 (0)