Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
651d946
feat: Adds WARN log for series deletion
devanbenz Jan 13, 2026
66c3775
feat: Use 10_000 series constant instead of 24 hours
devanbenz Jan 13, 2026
7893021
feat: Use info log
devanbenz Jan 13, 2026
1742429
feat: Let's add the flush changes to this PR
devanbenz Jan 14, 2026
6ae9a25
feat: adjust comment
devanbenz Jan 14, 2026
a53bb61
feat: Use a modulo
devanbenz Jan 14, 2026
07a6c66
feat: Rename vars for better clarity
devanbenz Jan 14, 2026
67414c4
feat: Use f.partitions index directly
devanbenz Jan 14, 2026
e6dc601
feat: Remove code dupe, create DeleteSeries which takes and iter and fn
devanbenz Jan 15, 2026
d3dc71d
feat: Updates to errors in segment flushing
devanbenz Jan 15, 2026
2c6528a
feat: accidently moved comment
devanbenz Jan 15, 2026
d536fad
feat: Lets just revert back to ss.ForEach
devanbenz Jan 15, 2026
519d521
feat: Just log out flush errors no need to exit function
devanbenz Jan 15, 2026
a057a53
fix: Don't need locks in for each
devanbenz Jan 15, 2026
617a1b5
chore: fix comment for epoch tracker
devanbenz Jan 15, 2026
f9a9530
chore: again
devanbenz Jan 15, 2026
5b919e3
feat: create no flush test
devanbenz Jan 15, 2026
ceded32
feat: Update test to use testify
devanbenz Jan 15, 2026
aba4bbe
feat: update comment
devanbenz Jan 15, 2026
fefb99d
feat: Return flush, add sfile path to log, optimize deleteSeriesRange
devanbenz Jan 16, 2026
352856f
feat: Add locking to FlushSegments
devanbenz Jan 16, 2026
ab5406e
feat: remove test logging
devanbenz Jan 16, 2026
71a35e7
feat: Parallel FlushSegments
devanbenz Jan 16, 2026
24fdea2
feat: Add FlushSegments test, use SeriesFilePartitionN for err chan len
devanbenz Jan 16, 2026
881f2af
feat: Remove dead code
devanbenz Jan 16, 2026
5014c28
feat: errChan simplification
devanbenz Jan 16, 2026
fbb2aab
feat: Add segment name to error logs
devanbenz Jan 16, 2026
63b90d4
feat: pre-allocate in test
devanbenz Jan 16, 2026
2de8798
feat: one more segment.Flush()
devanbenz Jan 16, 2026
553e13d
feat: fix make for slices
devanbenz Jan 16, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions tsdb/series_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,30 @@ func (f *SeriesFile) DeleteSeriesID(id uint64) error {
return p.DeleteSeriesID(id)
}

// DeleteSeriesIDNoFlush flags a series as permanently deleted.
// This method returns a SeriesPartition that can be then used later
// for fsync operations.
func (f *SeriesFile) DeleteSeriesIDNoFlush(id uint64) (*SeriesPartition, error) {
p := f.SeriesIDPartition(id)
if p == nil {
return nil, ErrInvalidSeriesPartitionID
}
return p, p.DeleteSeriesIDNoFlush(id)
}

// FlushSegments flushes a group of partitions by id
func (f *SeriesFile) FlushSegments(partitionIDs map[int]struct{}) error {
for id := range partitionIDs {
p := f.partitions[id]
if segment := p.activeSegment(); segment != nil {
if err := segment.Flush(); err != nil {
return err
}
}
}
return nil
}

// IsDeleted returns true if the ID has been deleted before.
func (f *SeriesFile) IsDeleted(id uint64) bool {
p := f.SeriesIDPartition(id)
Expand Down
44 changes: 44 additions & 0 deletions tsdb/series_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,50 @@ func (p *SeriesPartition) Compacting() bool {
return p.compacting
}

// DeleteSeriesIDNoFlush flags a series as permanently deleted.
// This operation does not incur an fsync.
// If the series is reintroduced later then it must create a new id.
func (p *SeriesPartition) DeleteSeriesIDNoFlush(id uint64) error {
p.mu.Lock()
defer p.mu.Unlock()

if p.closed {
return ErrSeriesPartitionClosed
}

// Already tombstoned, ignore.
if p.index.IsDeleted(id) {
return nil
}

// Write tombstone entry.
_, err := p.writeLogEntry(AppendSeriesEntry(nil, SeriesEntryTombstoneFlag, id, nil))
if err != nil {
return err
}

// Mark tombstone in memory.
p.index.Delete(id)

return nil
}

// FlushSegment fsyncs a segment to disk
func (p *SeriesPartition) FlushSegment() error {
p.mu.Lock()
defer p.mu.Unlock()
if p.closed {
return ErrSeriesPartitionClosed
}

if segment := p.activeSegment(); segment != nil {
if err := segment.Flush(); err != nil {
return err
}
}
return nil
}

// DeleteSeriesID flags a series as permanently deleted.
// If the series is reintroduced later then it must create a new id.
func (p *SeriesPartition) DeleteSeriesID(id uint64) error {
Expand Down
26 changes: 25 additions & 1 deletion tsdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1073,15 +1073,39 @@ func (s *Store) DeleteShard(shardID uint64) error {
}
}

const DeleteLogTrigger = 10_000
seriesCount := ss.Cardinality()
deleteStart := time.Now()
var deletedCount atomic.Uint64
var partitionIDs = make(map[int]struct{})

ss.ForEach(func(id uint64) {
if err := sfile.DeleteSeriesID(id); err != nil {
part, err := sfile.DeleteSeriesIDNoFlush(id)
if err != nil {
sfile.Logger.Error(
"cannot delete series in shard",
zap.Uint64("series_id", id),
zap.Uint64("shard_id", shardID),
zap.Error(err))
} else {
partitionIDs[part.id] = struct{}{}
deleted := deletedCount.Add(1)

if deleted%DeleteLogTrigger == 0 {
s.Logger.Info(fmt.Sprintf("DeleteShard: %d series deleted", DeleteLogTrigger),
zap.String("db", db),
zap.Uint64("shard_id", shardID),
zap.Uint64("deleted", deleted),
zap.Uint64("remaining", seriesCount-deleted),
zap.Uint64("total", seriesCount),
zap.Duration("elapsed", time.Since(deleteStart)))
}
}
})

if err := sfile.FlushSegments(partitionIDs); err != nil {
return err
}
}
}

Expand Down