Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cmd/influx_tools/compact/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,10 @@ func (sc *shardCompactor) ParseFileName(path string) (int, int, error) {
return 0, 0, errors.New("not implemented")
}

func (sc *shardCompactor) SupportsCompactionPlanning() bool {
return false
}

func newShardCompactor(path string, logger *zap.Logger) (sc *shardCompactor, err error) {
sc = &shardCompactor{
logger: logger,
Expand Down
50 changes: 22 additions & 28 deletions tsdb/engine/tsm1/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,17 @@ type DefaultPlanner struct {
type fileStore interface {
Stats() []ExtFileStat
LastModified() time.Time
ParseFileName(path string) (int, int, error)
ParseFileName(path string) (generation int, sequence int, err error)
NextGeneration() int
TSMReader(path string) (*TSMReader, error)
SupportsCompactionPlanning() bool
}

func NewDefaultPlanner(fs fileStore, writeColdDuration time.Duration) *DefaultPlanner {
if !fs.SupportsCompactionPlanning() {
// This should only happen due to developer mistakes.
panic("fileStore must support compaction planning")
}
return &DefaultPlanner{
FileStore: fs,
compactFullWriteColdDuration: writeColdDuration,
Expand All @@ -188,15 +193,13 @@ func NewDefaultPlanner(fs fileStore, writeColdDuration time.Duration) *DefaultPl
// 000001-01.tsm, 000001-02.tsm would be in the same generation
// 000001 each with different sequence numbers.
type tsmGeneration struct {
id int
files []ExtFileStat
parseFileName ParseFileNameFunc
id int
files []ExtFileStat
}

func newTsmGeneration(id int, parseFileNameFunc ParseFileNameFunc) *tsmGeneration {
func newTsmGeneration(id int) *tsmGeneration {
return &tsmGeneration{
id: id,
parseFileName: parseFileNameFunc,
id: id,
}
}

Expand All @@ -213,13 +216,12 @@ func (t *tsmGeneration) size() uint64 {
func (t *tsmGeneration) level() int {
// Level 0 is always created from the result of a cache compaction. It generates
// 1 file with a sequence num of 1. Level 2 is generated by compacting multiple
// level 1 files. Level 3 is generate by compacting multiple level 2 files. Level
// level 1 files. Level 3 is generated by compacting multiple level 2 files. Level
// 4 is for anything else.
_, seq, _ := t.parseFileName(t.files[0].Path)
if seq < 4 {
return seq
}

if t.files[0].Sequence < 4 {
return t.files[0].Sequence
}
return 4
}

Expand Down Expand Up @@ -250,10 +252,6 @@ func (c *DefaultPlanner) SetFileStore(fs *FileStore) {
c.FileStore = fs
}

func (c *DefaultPlanner) ParseFileName(path string) (int, int, error) {
return c.FileStore.ParseFileName(path)
}

func (c *DefaultPlanner) generationsFullyCompacted(gens tsmGenerations) (bool, string) {
if len(gens) > 1 {
return false, "not fully compacted and not idle because of more than one generation"
Expand Down Expand Up @@ -704,17 +702,15 @@ func (c *DefaultPlanner) findGenerations(skipInUse bool) tsmGenerations {
tsmStats := c.FileStore.Stats()
generations := make(map[int]*tsmGeneration, len(tsmStats))
for _, f := range tsmStats {
gen, _, _ := c.ParseFileName(f.Path)

// Skip any files that are assigned to a current compaction plan
if _, ok := c.filesInUse[f.Path]; skipInUse && ok {
continue
}

group := generations[gen]
group := generations[f.Generation]
if group == nil {
group = newTsmGeneration(gen, c.ParseFileName)
generations[gen] = group
group = newTsmGeneration(f.Generation)
generations[f.Generation] = group
}
group.files = append(group.files, f)
}
Expand Down Expand Up @@ -786,7 +782,6 @@ type Compactor struct {
RateLimit limiter.Rate

formatFileName FormatFileNameFunc
parseFileName ParseFileNameFunc

mu sync.RWMutex
snapshotsEnabled bool
Expand All @@ -809,18 +804,13 @@ type Compactor struct {
func NewCompactor() *Compactor {
return &Compactor{
formatFileName: DefaultFormatFileName,
parseFileName: DefaultParseFileName,
}
}

func (c *Compactor) WithFormatFileNameFunc(formatFileNameFunc FormatFileNameFunc) {
c.formatFileName = formatFileNameFunc
}

func (c *Compactor) WithParseFileNameFunc(parseFileNameFunc ParseFileNameFunc) {
c.parseFileName = parseFileNameFunc
}

// Open initializes the Compactor.
func (c *Compactor) Open() {
c.mu.Lock()
Expand Down Expand Up @@ -986,8 +976,12 @@ func (c *Compactor) compact(fast bool, tsmFiles []string, logger *zap.Logger, po
// set. We need to find that max generation as well as the max sequence
// number to ensure we write to the next unique location.
var maxGeneration, maxSequence int

if c.FileStore == nil {
return nil, fmt.Errorf("compactor for %s has no file store: %w", c.Dir, errCompactionsDisabled)
}
for _, f := range tsmFiles {
gen, seq, err := c.parseFileName(f)
gen, seq, err := c.FileStore.ParseFileName(f)
if err != nil {
return nil, err
}
Expand Down
Loading