Skip to content
Closed
61 changes: 17 additions & 44 deletions tsdb/engine/tsm1/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,11 +161,6 @@ type DefaultPlanner struct {
lastPlanCheck time.Time

mu sync.RWMutex
// lastFindGenerations is the last time findGenerations was run
lastFindGenerations time.Time

// lastGenerations is the last set of generations found by findGenerations
lastGenerations tsmGenerations

// forceFull causes the next full plan requests to plan any files
// that may need to be compacted. Normally, these files are skipped and scheduled
Expand All @@ -184,7 +179,7 @@ type DefaultPlanner struct {
type fileStore interface {
Stats() []ExtFileStat
LastModified() time.Time
ParseFileName(path string) (int, int, error)
ParseFileName(path string) (generation int, sequence int, err error)
NextGeneration() int
TSMReader(path string) (*TSMReader, error)
}
Expand All @@ -202,15 +197,13 @@ func NewDefaultPlanner(fs fileStore, writeColdDuration time.Duration) *DefaultPl
// 000001-01.tsm, 000001-02.tsm would be in the same generation
// 000001 each with different sequence numbers.
type tsmGeneration struct {
id int
files []ExtFileStat
parseFileName ParseFileNameFunc
id int
files []ExtFileStat
}

func newTsmGeneration(id int, parseFileNameFunc ParseFileNameFunc) *tsmGeneration {
func newTsmGeneration(id int) *tsmGeneration {
return &tsmGeneration{
id: id,
parseFileName: parseFileNameFunc,
id: id,
}
}

Expand All @@ -227,13 +220,12 @@ func (t *tsmGeneration) size() uint64 {
func (t *tsmGeneration) level() int {
// Level 0 is always created from the result of a cache compaction. It generates
// 1 file with a sequence num of 1. Level 2 is generated by compacting multiple
// level 1 files. Level 3 is generate by compacting multiple level 2 files. Level
// level 1 files. Level 3 is generated by compacting multiple level 2 files. Level
// 4 is for anything else.
_, seq, _ := t.parseFileName(t.files[0].Path)
if seq < 4 {
return seq
}

if t.files[0].Sequence < 4 {
return t.files[0].Sequence
}
return 4
}

Expand Down Expand Up @@ -264,10 +256,6 @@ func (c *DefaultPlanner) SetFileStore(fs *FileStore) {
c.FileStore = fs
}

func (c *DefaultPlanner) ParseFileName(path string) (int, int, error) {
return c.FileStore.ParseFileName(path)
}

func (c *DefaultPlanner) generationsFullyCompacted(gens tsmGenerations) (bool, string) {
if len(gens) > 1 {
return false, "not fully compacted and not idle because of more than one generation"
Expand Down Expand Up @@ -707,28 +695,18 @@ func (c *DefaultPlanner) findGenerations(skipInUse bool) tsmGenerations {
c.mu.Lock()
defer c.mu.Unlock()

last := c.lastFindGenerations
lastGen := c.lastGenerations

if !last.IsZero() && c.FileStore.LastModified().Equal(last) {
return lastGen
}

genTime := c.FileStore.LastModified()
tsmStats := c.FileStore.Stats()
generations := make(map[int]*tsmGeneration, len(tsmStats))
for _, f := range tsmStats {
gen, _, _ := c.ParseFileName(f.Path)

// Skip any files that are assigned to a current compaction plan
if _, ok := c.filesInUse[f.Path]; skipInUse && ok {
continue
}

group := generations[gen]
group := generations[f.Generation]
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we check that f.Generation and f.Sequence are not 0 before using them? This isn't an issue if we force supplying a parsing function or stop parameterizing it and always use the default.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we have to parameterize it to preserve tests which use unformatted TSM file names (from various mktemp sort of calls).

if group == nil {
group = newTsmGeneration(gen, c.ParseFileName)
generations[gen] = group
group = newTsmGeneration(f.Generation)
generations[f.Generation] = group
}
group.files = append(group.files, f)
}
Expand All @@ -741,9 +719,6 @@ func (c *DefaultPlanner) findGenerations(skipInUse bool) tsmGenerations {
sort.Sort(orderedGenerations)
}

c.lastFindGenerations = genTime
c.lastGenerations = orderedGenerations

return orderedGenerations
}

Expand Down Expand Up @@ -800,7 +775,6 @@ type Compactor struct {
RateLimit limiter.Rate

formatFileName FormatFileNameFunc
parseFileName ParseFileNameFunc

mu sync.RWMutex
snapshotsEnabled bool
Expand All @@ -823,18 +797,13 @@ type Compactor struct {
func NewCompactor() *Compactor {
return &Compactor{
formatFileName: DefaultFormatFileName,
parseFileName: DefaultParseFileName,
}
}

func (c *Compactor) WithFormatFileNameFunc(formatFileNameFunc FormatFileNameFunc) {
c.formatFileName = formatFileNameFunc
}

func (c *Compactor) WithParseFileNameFunc(parseFileNameFunc ParseFileNameFunc) {
c.parseFileName = parseFileNameFunc
}

// Open initializes the Compactor.
func (c *Compactor) Open() {
c.mu.Lock()
Expand Down Expand Up @@ -1000,8 +969,12 @@ func (c *Compactor) compact(fast bool, tsmFiles []string, logger *zap.Logger, po
// set. We need to find that max generation as well as the max sequence
// number to ensure we write to the next unique location.
var maxGeneration, maxSequence int

if c.FileStore == nil {
return nil, fmt.Errorf("compactor for %s has no file store: %w", c.Dir, errCompactionsDisabled)
}
for _, f := range tsmFiles {
gen, seq, err := c.parseFileName(f)
gen, seq, err := c.FileStore.ParseFileName(f)
if err != nil {
return nil, err
}
Expand Down
Loading