Skip to content

Commit 53f09d7

Browse files
committed
accounting bytes written stat with all the indexing options
1 parent 0080cf1 commit 53f09d7

7 files changed

Lines changed: 110 additions & 20 deletions

File tree

contentcoder.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"encoding/binary"
2020
"io"
2121
"reflect"
22+
"sync/atomic"
2223

2324
"github.com/golang/snappy"
2425
)
@@ -48,6 +49,9 @@ type chunkedContentCoder struct {
4849
chunkMeta []MetaData
4950

5051
compressed []byte // temp buf for snappy compression
52+
53+
// atomic access to this variable
54+
bytesWritten uint64
5155
}
5256

5357
// MetaData represents the data information inside a
@@ -105,6 +109,14 @@ func (c *chunkedContentCoder) Close() error {
105109
return c.flushContents()
106110
}
107111

112+
func (c *chunkedContentCoder) incrementBytesWritten(val uint64) {
113+
atomic.AddUint64(&c.bytesWritten, val)
114+
}
115+
116+
func (c *chunkedContentCoder) getBytesWritten() uint64 {
117+
return atomic.LoadUint64(&c.bytesWritten)
118+
}
119+
108120
func (c *chunkedContentCoder) flushContents() error {
109121
// flush the contents, with meta information at first
110122
buf := make([]byte, binary.MaxVarintLen64)
@@ -127,6 +139,7 @@ func (c *chunkedContentCoder) flushContents() error {
127139
c.final = append(c.final, c.chunkMetaBuf.Bytes()...)
128140
// write the compressed data to the final data
129141
c.compressed = snappy.Encode(c.compressed[:cap(c.compressed)], c.chunkBuf.Bytes())
142+
c.incrementBytesWritten(uint64(len(c.compressed)))
130143
c.final = append(c.final, c.compressed...)
131144

132145
c.chunkLens[c.currChunk] = uint64(len(c.compressed) + len(metaData))

docvalues.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,16 +142,20 @@ func (di *docValueReader) BytesRead() uint64 {
142142
return atomic.LoadUint64(&di.bytesRead)
143143
}
144144

145-
func (di *docValueReader) SetBytesRead(val uint64) {
145+
func (di *docValueReader) ResetBytesRead(val uint64) {
146146
atomic.StoreUint64(&di.bytesRead, val)
147147
}
148148

149149
func (di *docValueReader) incrementBytesRead(val uint64) {
150-
if segment.CollectIOStats {
150+
if CollectDiskStats {
151151
atomic.AddUint64(&di.bytesRead, val)
152152
}
153153
}
154154

155+
func (di *docValueReader) BytesWritten() uint64 {
156+
return 0
157+
}
158+
155159
func (di *docValueReader) loadDvChunk(chunkNumber uint64, s *SegmentBase) error {
156160
// advance to the chunk where the docValues
157161
// reside for the given docNum

intDecoder.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@ import (
1818
"encoding/binary"
1919
"fmt"
2020
"sync/atomic"
21-
22-
segment "github.com/blevesearch/scorch_segment_api/v2"
2321
)
2422

2523
type chunkedIntDecoder struct {
@@ -61,7 +59,7 @@ func newChunkedIntDecoder(buf []byte, offset uint64, rv *chunkedIntDecoder) *chu
6159
rv.chunkOffsets[i], read = binary.Uvarint(buf[offset+n : offset+n+binary.MaxVarintLen64])
6260
n += uint64(read)
6361
}
64-
if segment.CollectIOStats {
62+
if CollectDiskStats {
6563
atomic.AddUint64(&rv.bytesRead, n)
6664
}
6765
rv.dataStartOffset = offset + n
@@ -93,7 +91,7 @@ func (d *chunkedIntDecoder) loadChunk(chunk int) error {
9391
start += s
9492
end += e
9593
d.curChunkBytes = d.data[start:end]
96-
if segment.CollectIOStats {
94+
if CollectDiskStats {
9795
atomic.AddUint64(&d.bytesRead, uint64(len(d.curChunkBytes)))
9896
}
9997
if d.r == nil {

intcoder.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"bytes"
1919
"encoding/binary"
2020
"io"
21+
"sync/atomic"
2122
)
2223

2324
// We can safely use 0 to represent termNotEncoded since 0
@@ -34,6 +35,9 @@ type chunkedIntCoder struct {
3435
currChunk uint64
3536

3637
buf []byte
38+
39+
// atomic access to this variable
40+
bytesWritten uint64
3741
}
3842

3943
// newChunkedIntCoder returns a new chunk int coder which packs data into
@@ -73,6 +77,14 @@ func (c *chunkedIntCoder) SetChunkSize(chunkSize uint64, maxDocNum uint64) {
7377
}
7478
}
7579

80+
func (c *chunkedIntCoder) incrementBytesWritten(val uint64) {
81+
atomic.AddUint64(&c.bytesWritten, val)
82+
}
83+
84+
func (c *chunkedIntCoder) getBytesWritten() uint64 {
85+
return atomic.LoadUint64(&c.bytesWritten)
86+
}
87+
7688
// Add encodes the provided integers into the correct chunk for the provided
7789
// doc num. You MUST call Add() with increasing docNums.
7890
func (c *chunkedIntCoder) Add(docNum uint64, vals ...uint64) error {
@@ -94,6 +106,7 @@ func (c *chunkedIntCoder) Add(docNum uint64, vals ...uint64) error {
94106
if err != nil {
95107
return err
96108
}
109+
c.incrementBytesWritten(uint64(wb))
97110
}
98111

99112
return nil

new.go

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"math"
2121
"sort"
2222
"sync"
23+
"sync/atomic"
2324

2425
"github.com/RoaringBitmap/roaring"
2526
index "github.com/blevesearch/bleve_index_api"
@@ -32,6 +33,10 @@ var NewSegmentBufferNumResultsBump int = 100
3233
var NewSegmentBufferNumResultsFactor float64 = 1.0
3334
var NewSegmentBufferAvgBytesPerDocFactor float64 = 1.0
3435

36+
// This flag controls the disk stats collection from the segment files
37+
// during indexing and querying
38+
var CollectDiskStats bool
39+
3540
// ValidateDocFields can be set by applications to perform additional checks
3641
// on fields in a document being added to a new segment, by default it does
3742
// nothing.
@@ -80,6 +85,7 @@ func (*ZapPlugin) newWithChunkMode(results []index.Document,
8085
if err == nil && s.reset() == nil {
8186
s.lastNumDocs = len(results)
8287
s.lastOutSize = len(br.Bytes())
88+
sb.setBytesWritten(s.getBytesWritten())
8389
interimPool.Put(s)
8490
}
8591

@@ -141,6 +147,9 @@ type interim struct {
141147

142148
lastNumDocs int
143149
lastOutSize int
150+
151+
// atomic access to this variable
152+
bytesWritten uint64
144153
}
145154

146155
func (s *interim) reset() (err error) {
@@ -484,6 +493,16 @@ func (s *interim) processDocument(docNum uint64,
484493
}
485494
}
486495

496+
func (s *interim) getBytesWritten() uint64 {
497+
return atomic.LoadUint64(&s.bytesWritten)
498+
}
499+
500+
func (s *interim) incrementBytesWritten(val uint64) {
501+
if CollectDiskStats {
502+
atomic.AddUint64(&s.bytesWritten, val)
503+
}
504+
}
505+
487506
func (s *interim) writeStoredFields() (
488507
storedIndexOffset uint64, err error) {
489508
varBuf := make([]byte, binary.MaxVarintLen64)
@@ -559,7 +578,7 @@ func (s *interim) writeStoredFields() (
559578
metaBytes := s.metaBuf.Bytes()
560579

561580
compressed = snappy.Encode(compressed[:cap(compressed)], data)
562-
581+
s.incrementBytesWritten(uint64(len(compressed)))
563582
docStoredOffsets[docNum] = uint64(s.w.Count())
564583

565584
_, err := writeUvarints(s.w,
@@ -597,6 +616,10 @@ func (s *interim) writeStoredFields() (
597616
return storedIndexOffset, nil
598617
}
599618

619+
func (s *interim) setBytesWritten(val uint64) {
620+
atomic.StoreUint64(&s.bytesWritten, val)
621+
}
622+
600623
func (s *interim) writeDicts() (fdvIndexOffset uint64, dictOffsets []uint64, err error) {
601624
dictOffsets = make([]uint64, len(s.FieldsInv))
602625

@@ -682,7 +705,7 @@ func (s *interim) writeDicts() (fdvIndexOffset uint64, dictOffsets []uint64, err
682705
if err != nil {
683706
return 0, nil, err
684707
}
685-
708+
prevBytesWritten := locEncoder.getBytesWritten()
686709
for _, loc := range locs[locOffset : locOffset+freqNorm.numLocs] {
687710
err = locEncoder.Add(docNum,
688711
uint64(loc.fieldID), loc.pos, loc.start, loc.end,
@@ -696,7 +719,9 @@ func (s *interim) writeDicts() (fdvIndexOffset uint64, dictOffsets []uint64, err
696719
return 0, nil, err
697720
}
698721
}
699-
722+
if locEncoder.getBytesWritten()-prevBytesWritten > 0 {
723+
s.incrementBytesWritten(locEncoder.getBytesWritten() - prevBytesWritten)
724+
}
700725
locOffset += freqNorm.numLocs
701726
}
702727

@@ -750,6 +775,8 @@ func (s *interim) writeDicts() (fdvIndexOffset uint64, dictOffsets []uint64, err
750775
return 0, nil, err
751776
}
752777

778+
s.incrementBytesWritten(uint64(len(vellumData)))
779+
753780
// reset vellum for reuse
754781
s.builderBuf.Reset()
755782

@@ -764,6 +791,7 @@ func (s *interim) writeDicts() (fdvIndexOffset uint64, dictOffsets []uint64, err
764791
if err != nil {
765792
return 0, nil, err
766793
}
794+
767795
fdvEncoder := newChunkedContentCoder(chunkSize, uint64(len(s.results)-1), s.w, false)
768796
if s.IncludeDocValues[fieldID] {
769797
for docNum, docTerms := range docTermMap {
@@ -772,13 +800,16 @@ func (s *interim) writeDicts() (fdvIndexOffset uint64, dictOffsets []uint64, err
772800
if err != nil {
773801
return 0, nil, err
774802
}
803+
// s.incrementBytesWritten(uint64(len(docTerms)))
775804
}
776805
}
777806
err = fdvEncoder.Close()
778807
if err != nil {
779808
return 0, nil, err
780809
}
781810

811+
s.setBytesWritten(s.getBytesWritten())
812+
782813
fdvOffsetsStart[fieldID] = uint64(s.w.Count())
783814

784815
_, err = fdvEncoder.Write()

posting.go

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,7 @@ func (p *PostingsList) Count() uint64 {
254254
// The purpose of this implementation is to get
255255
// the bytes read from the postings lists stored
256256
// on disk, while querying
257-
func (p *PostingsList) SetBytesRead(val uint64) {
257+
func (p *PostingsList) ResetBytesRead(val uint64) {
258258
atomic.StoreUint64(&p.bytesRead, val)
259259
}
260260

@@ -263,11 +263,15 @@ func (p *PostingsList) BytesRead() uint64 {
263263
}
264264

265265
func (p *PostingsList) incrementBytesRead(val uint64) {
266-
if segment.CollectIOStats {
266+
if CollectDiskStats {
267267
atomic.AddUint64(&p.bytesRead, val)
268268
}
269269
}
270270

271+
func (p *PostingsList) BytesWritten() uint64 {
272+
return 0
273+
}
274+
271275
func (rv *PostingsList) read(postingsOffset uint64, d *Dictionary) error {
272276
rv.postingsOffset = postingsOffset
273277

@@ -365,7 +369,7 @@ func (i *PostingsIterator) Size() int {
365369
// the bytes read from the disk which includes
366370
// the freqNorm and location specific information
367371
// of a hit
368-
func (i *PostingsIterator) SetBytesRead(val uint64) {
372+
func (i *PostingsIterator) ResetBytesRead(val uint64) {
369373
atomic.StoreUint64(&i.bytesRead, val)
370374
}
371375

@@ -374,11 +378,15 @@ func (i *PostingsIterator) BytesRead() uint64 {
374378
}
375379

376380
func (i *PostingsIterator) incrementBytesRead(val uint64) {
377-
if segment.CollectIOStats {
381+
if CollectDiskStats {
378382
atomic.AddUint64(&i.bytesRead, val)
379383
}
380384
}
381385

386+
func (i *PostingsIterator) BytesWritten() uint64 {
387+
return 0
388+
}
389+
382390
func (i *PostingsIterator) loadChunk(chunk int) error {
383391
if i.includeFreqNorm {
384392
err := i.freqNormReader.loadChunk(chunk)
@@ -390,15 +398,15 @@ func (i *PostingsIterator) loadChunk(chunk int) error {
390398
// the postingsIterator is tracking only the chunk loaded
391399
// and the cumulation is tracked correctly in the downstream
392400
// intDecoder
393-
i.SetBytesRead(i.freqNormReader.getBytesRead())
401+
i.ResetBytesRead(i.freqNormReader.getBytesRead())
394402
}
395403

396404
if i.includeLocs {
397405
err := i.locReader.loadChunk(chunk)
398406
if err != nil {
399407
return err
400408
}
401-
i.SetBytesRead(i.locReader.getBytesRead())
409+
i.ResetBytesRead(i.locReader.getBytesRead())
402410
}
403411

404412
i.currChunk = uint32(chunk)

segment.go

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,8 @@ type SegmentBase struct {
104104
size uint64
105105

106106
// atomic access to this variable
107-
bytesRead uint64
107+
bytesRead uint64
108+
bytesWritten uint64
108109

109110
m sync.Mutex
110111
fieldFSTs map[uint16]*vellum.FST
@@ -226,23 +227,45 @@ func (s *Segment) loadConfig() error {
226227
// interface, as the intention is to retrieve the bytes
227228
// read from the on-disk segment as part of the current
228229
// query.
229-
func (s *Segment) SetBytesRead(val uint64) {
230-
atomic.StoreUint64(&s.SegmentBase.bytesRead, val)
230+
func (s *Segment) ResetBytesRead(val uint64) {
231+
if CollectDiskStats {
232+
atomic.StoreUint64(&s.SegmentBase.bytesRead, val)
233+
}
231234
}
232235

233236
func (s *Segment) BytesRead() uint64 {
234237
return atomic.LoadUint64(&s.bytesRead) +
235238
atomic.LoadUint64(&s.SegmentBase.bytesRead)
236239
}
237240

241+
func (s *Segment) BytesWritten() uint64 {
242+
return 0
243+
}
244+
238245
func (s *Segment) incrementBytesRead(val uint64) {
239-
if segment.CollectIOStats {
246+
if CollectDiskStats {
240247
atomic.AddUint64(&s.bytesRead, val)
241248
}
242249
}
243250

251+
func (s *SegmentBase) BytesWritten() uint64 {
252+
return atomic.LoadUint64(&s.bytesWritten)
253+
}
254+
255+
func (s *SegmentBase) setBytesWritten(val uint64) {
256+
if CollectDiskStats {
257+
atomic.AddUint64(&s.bytesWritten, val)
258+
}
259+
}
260+
261+
func (s *SegmentBase) BytesRead() uint64 {
262+
return 0
263+
}
264+
265+
func (s *SegmentBase) ResetBytesRead(val uint64) {}
266+
244267
func (s *SegmentBase) incrementBytesRead(val uint64) {
245-
if segment.CollectIOStats {
268+
if CollectDiskStats {
246269
atomic.AddUint64(&s.bytesRead, val)
247270
}
248271
}

0 commit comments

Comments
 (0)