Skip to content
4 changes: 4 additions & 0 deletions cmd/influx_tools/compact/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,10 @@ func (sc *shardCompactor) ParseFileName(path string) (int, int, error) {
return 0, 0, errors.New("not implemented")
}

func (sc *shardCompactor) SupportsCompactionPlanning() bool {
return false
}

func newShardCompactor(path string, logger *zap.Logger) (sc *shardCompactor, err error) {
sc = &shardCompactor{
logger: logger,
Expand Down
50 changes: 22 additions & 28 deletions tsdb/engine/tsm1/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,12 +184,17 @@ type DefaultPlanner struct {
type fileStore interface {
Stats() []ExtFileStat
LastModified() time.Time
ParseFileName(path string) (int, int, error)
ParseFileName(path string) (generation int, sequence int, err error)
NextGeneration() int
TSMReader(path string) (*TSMReader, error)
SupportsCompactionPlanning() bool
}

func NewDefaultPlanner(fs fileStore, writeColdDuration time.Duration) *DefaultPlanner {
if !fs.SupportsCompactionPlanning() {
// This should only happen due to developer mistakes.
panic("fileStore must support compaction planning")
}
return &DefaultPlanner{
FileStore: fs,
compactFullWriteColdDuration: writeColdDuration,
Expand All @@ -202,15 +207,13 @@ func NewDefaultPlanner(fs fileStore, writeColdDuration time.Duration) *DefaultPl
// 000001-01.tsm, 000001-02.tsm would be in the same generation
// 000001 each with different sequence numbers.
type tsmGeneration struct {
id int
files []ExtFileStat
parseFileName ParseFileNameFunc
id int
files []ExtFileStat
}

func newTsmGeneration(id int, parseFileNameFunc ParseFileNameFunc) *tsmGeneration {
func newTsmGeneration(id int) *tsmGeneration {
return &tsmGeneration{
id: id,
parseFileName: parseFileNameFunc,
id: id,
}
}

Expand All @@ -227,13 +230,12 @@ func (t *tsmGeneration) size() uint64 {
func (t *tsmGeneration) level() int {
// Level 0 is always created from the result of a cache compaction. It generates
// 1 file with a sequence num of 1. Level 2 is generated by compacting multiple
// level 1 files. Level 3 is generate by compacting multiple level 2 files. Level
// level 1 files. Level 3 is generated by compacting multiple level 2 files. Level
// 4 is for anything else.
_, seq, _ := t.parseFileName(t.files[0].Path)
if seq < 4 {
return seq
}

if t.files[0].Sequence < 4 {
return t.files[0].Sequence
}
return 4
}

Expand Down Expand Up @@ -264,10 +266,6 @@ func (c *DefaultPlanner) SetFileStore(fs *FileStore) {
c.FileStore = fs
}

func (c *DefaultPlanner) ParseFileName(path string) (int, int, error) {
return c.FileStore.ParseFileName(path)
}

func (c *DefaultPlanner) generationsFullyCompacted(gens tsmGenerations) (bool, string) {
if len(gens) > 1 {
return false, "not fully compacted and not idle because of more than one generation"
Expand Down Expand Up @@ -718,17 +716,15 @@ func (c *DefaultPlanner) findGenerations(skipInUse bool) tsmGenerations {
tsmStats := c.FileStore.Stats()
generations := make(map[int]*tsmGeneration, len(tsmStats))
for _, f := range tsmStats {
gen, _, _ := c.ParseFileName(f.Path)

// Skip any files that are assigned to a current compaction plan
if _, ok := c.filesInUse[f.Path]; skipInUse && ok {
continue
}

group := generations[gen]
group := generations[f.Generation]
if group == nil {
group = newTsmGeneration(gen, c.ParseFileName)
generations[gen] = group
group = newTsmGeneration(f.Generation)
generations[f.Generation] = group
}
group.files = append(group.files, f)
}
Expand Down Expand Up @@ -800,7 +796,6 @@ type Compactor struct {
RateLimit limiter.Rate

formatFileName FormatFileNameFunc
parseFileName ParseFileNameFunc

mu sync.RWMutex
snapshotsEnabled bool
Expand All @@ -823,18 +818,13 @@ type Compactor struct {
func NewCompactor() *Compactor {
return &Compactor{
formatFileName: DefaultFormatFileName,
parseFileName: DefaultParseFileName,
}
}

func (c *Compactor) WithFormatFileNameFunc(formatFileNameFunc FormatFileNameFunc) {
c.formatFileName = formatFileNameFunc
}

func (c *Compactor) WithParseFileNameFunc(parseFileNameFunc ParseFileNameFunc) {
c.parseFileName = parseFileNameFunc
}

// Open initializes the Compactor.
func (c *Compactor) Open() {
c.mu.Lock()
Expand Down Expand Up @@ -1000,8 +990,12 @@ func (c *Compactor) compact(fast bool, tsmFiles []string, logger *zap.Logger, po
// set. We need to find that max generation as well as the max sequence
// number to ensure we write to the next unique location.
var maxGeneration, maxSequence int

if c.FileStore == nil {
return nil, fmt.Errorf("compactor for %s has no file store: %w", c.Dir, errCompactionsDisabled)
}
for _, f := range tsmFiles {
gen, seq, err := c.parseFileName(f)
gen, seq, err := c.FileStore.ParseFileName(f)
if err != nil {
return nil, err
}
Expand Down
Loading