Skip to content
Closed
4 changes: 4 additions & 0 deletions cmd/influx_tools/compact/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,10 @@ func (sc *shardCompactor) ParseFileName(path string) (int, int, error) {
return 0, 0, errors.New("not implemented")
}

func (sc *shardCompactor) SupportsCompactionPlanning() bool {
return false
}

func newShardCompactor(path string, logger *zap.Logger) (sc *shardCompactor, err error) {
sc = &shardCompactor{
logger: logger,
Expand Down
66 changes: 22 additions & 44 deletions tsdb/engine/tsm1/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,11 +161,6 @@ type DefaultPlanner struct {
lastPlanCheck time.Time

mu sync.RWMutex
// lastFindGenerations is the last time findGenerations was run
lastFindGenerations time.Time

// lastGenerations is the last set of generations found by findGenerations
lastGenerations tsmGenerations

// forceFull causes the next full plan requests to plan any files
// that may need to be compacted. Normally, these files are skipped and scheduled
Expand All @@ -184,12 +179,17 @@ type DefaultPlanner struct {
type fileStore interface {
Stats() []ExtFileStat
LastModified() time.Time
ParseFileName(path string) (int, int, error)
ParseFileName(path string) (generation int, sequence int, err error)
NextGeneration() int
TSMReader(path string) (*TSMReader, error)
SupportsCompactionPlanning() bool
}

func NewDefaultPlanner(fs fileStore, writeColdDuration time.Duration) *DefaultPlanner {
if !fs.SupportsCompactionPlanning() {
// This should only happen due to developer mistakes.
panic("fileStore must support compaction planning")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this should only occur if a dev messes up, but, do we really want a panic in production code? Also, would it be better to have a method called MustSupportCompactionPlanning() to put this in that is similar to other parts of the codebase where we panic?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had this convo in slack. Not panicking would be more invasive to the code (mainly test code), and the reason for even allowing a FileStore that didn't support compaction planning was to avoid a lot of test code changes. I'm also confident that you won't get this panic in production. If you panic in NewDefaultPlanner, influxd won't even start. You also can't a panic if the code is used properly. NewFileStore always sets a parse filename function. You have to either either to force the parse function to nil (not in codebase), create a FileStore without using NewFileStore (also not in codebase), or create various objects directly (only in tests).

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With regards to MustSupportCompactionPlanning, SupportsCompactionPlanning isn't where the panic comes from. It leaves the choice of how to handle the issue up to the caller.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good 👍

}
return &DefaultPlanner{
FileStore: fs,
compactFullWriteColdDuration: writeColdDuration,
Expand All @@ -202,15 +202,13 @@ func NewDefaultPlanner(fs fileStore, writeColdDuration time.Duration) *DefaultPl
// 000001-01.tsm, 000001-02.tsm would be in the same generation
// 000001 each with different sequence numbers.
type tsmGeneration struct {
id int
files []ExtFileStat
parseFileName ParseFileNameFunc
id int
files []ExtFileStat
}

func newTsmGeneration(id int, parseFileNameFunc ParseFileNameFunc) *tsmGeneration {
func newTsmGeneration(id int) *tsmGeneration {
return &tsmGeneration{
id: id,
parseFileName: parseFileNameFunc,
id: id,
}
}

Expand All @@ -227,13 +225,12 @@ func (t *tsmGeneration) size() uint64 {
func (t *tsmGeneration) level() int {
// Level 0 is always created from the result of a cache compaction. It generates
// 1 file with a sequence num of 1. Level 2 is generated by compacting multiple
// level 1 files. Level 3 is generate by compacting multiple level 2 files. Level
// level 1 files. Level 3 is generated by compacting multiple level 2 files. Level
// 4 is for anything else.
_, seq, _ := t.parseFileName(t.files[0].Path)
if seq < 4 {
return seq
}

if t.files[0].Sequence < 4 {
return t.files[0].Sequence
}
return 4
}

Expand Down Expand Up @@ -264,10 +261,6 @@ func (c *DefaultPlanner) SetFileStore(fs *FileStore) {
c.FileStore = fs
}

func (c *DefaultPlanner) ParseFileName(path string) (int, int, error) {
return c.FileStore.ParseFileName(path)
}

func (c *DefaultPlanner) generationsFullyCompacted(gens tsmGenerations) (bool, string) {
if len(gens) > 1 {
return false, "not fully compacted and not idle because of more than one generation"
Expand Down Expand Up @@ -707,28 +700,18 @@ func (c *DefaultPlanner) findGenerations(skipInUse bool) tsmGenerations {
c.mu.Lock()
defer c.mu.Unlock()

last := c.lastFindGenerations
lastGen := c.lastGenerations

if !last.IsZero() && c.FileStore.LastModified().Equal(last) {
return lastGen
}

genTime := c.FileStore.LastModified()
tsmStats := c.FileStore.Stats()
generations := make(map[int]*tsmGeneration, len(tsmStats))
for _, f := range tsmStats {
gen, _, _ := c.ParseFileName(f.Path)

// Skip any files that are assigned to a current compaction plan
if _, ok := c.filesInUse[f.Path]; skipInUse && ok {
continue
}

group := generations[gen]
group := generations[f.Generation]
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we check that f.Generation and f.Sequence are not 0 before using them? This isn't an issue if we force supplying a parsing function or stop parameterizing it and always use the default.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we have to parameterize it to preserve tests which use unformatted TSM file names (from various mktemp sort of calls).

if group == nil {
group = newTsmGeneration(gen, c.ParseFileName)
generations[gen] = group
group = newTsmGeneration(f.Generation)
generations[f.Generation] = group
}
group.files = append(group.files, f)
}
Expand All @@ -741,9 +724,6 @@ func (c *DefaultPlanner) findGenerations(skipInUse bool) tsmGenerations {
sort.Sort(orderedGenerations)
}

c.lastFindGenerations = genTime
c.lastGenerations = orderedGenerations

return orderedGenerations
}

Expand Down Expand Up @@ -800,7 +780,6 @@ type Compactor struct {
RateLimit limiter.Rate

formatFileName FormatFileNameFunc
parseFileName ParseFileNameFunc

mu sync.RWMutex
snapshotsEnabled bool
Expand All @@ -823,18 +802,13 @@ type Compactor struct {
func NewCompactor() *Compactor {
return &Compactor{
formatFileName: DefaultFormatFileName,
parseFileName: DefaultParseFileName,
}
}

func (c *Compactor) WithFormatFileNameFunc(formatFileNameFunc FormatFileNameFunc) {
c.formatFileName = formatFileNameFunc
}

func (c *Compactor) WithParseFileNameFunc(parseFileNameFunc ParseFileNameFunc) {
c.parseFileName = parseFileNameFunc
}

// Open initializes the Compactor.
func (c *Compactor) Open() {
c.mu.Lock()
Expand Down Expand Up @@ -1000,8 +974,12 @@ func (c *Compactor) compact(fast bool, tsmFiles []string, logger *zap.Logger, po
// set. We need to find that max generation as well as the max sequence
// number to ensure we write to the next unique location.
var maxGeneration, maxSequence int

if c.FileStore == nil {
return nil, fmt.Errorf("compactor for %s has no file store: %w", c.Dir, errCompactionsDisabled)
}
for _, f := range tsmFiles {
gen, seq, err := c.parseFileName(f)
gen, seq, err := c.FileStore.ParseFileName(f)
if err != nil {
return nil, err
}
Expand Down
Loading