Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
651d946
feat: Adds WARN log for series deletion
devanbenz Jan 13, 2026
66c3775
feat: Use 10_000 series constant instead of 24 hours
devanbenz Jan 13, 2026
7893021
feat: Use info log
devanbenz Jan 13, 2026
1742429
feat: Let's add the flush changes to this PR
devanbenz Jan 14, 2026
6ae9a25
feat: adjust comment
devanbenz Jan 14, 2026
a53bb61
feat: Use a modulo
devanbenz Jan 14, 2026
07a6c66
feat: Rename vars for better clarity
devanbenz Jan 14, 2026
67414c4
feat: Use f.partitions index directly
devanbenz Jan 14, 2026
e6dc601
feat: Remove code dupe, create DeleteSeries which takes and iter and fn
devanbenz Jan 15, 2026
d3dc71d
feat: Updates to errors in segment flushing
devanbenz Jan 15, 2026
2c6528a
feat: accidently moved comment
devanbenz Jan 15, 2026
d536fad
feat: Lets just revert back to ss.ForEach
devanbenz Jan 15, 2026
519d521
feat: Just log out flush errors no need to exit function
devanbenz Jan 15, 2026
a057a53
fix: Don't need locks in for each
devanbenz Jan 15, 2026
617a1b5
chore: fix comment for epoch tracker
devanbenz Jan 15, 2026
f9a9530
chore: again
devanbenz Jan 15, 2026
5b919e3
feat: create no flush test
devanbenz Jan 15, 2026
ceded32
feat: Update test to use testify
devanbenz Jan 15, 2026
aba4bbe
feat: update comment
devanbenz Jan 15, 2026
fefb99d
feat: Return flush, add sfile path to log, optimize deleteSeriesRange
devanbenz Jan 16, 2026
352856f
feat: Add locking to FlushSegments
devanbenz Jan 16, 2026
ab5406e
feat: remove test logging
devanbenz Jan 16, 2026
71a35e7
feat: Parallel FlushSegments
devanbenz Jan 16, 2026
24fdea2
feat: Add FlushSegments test, use SeriesFilePartitionN for err chan len
devanbenz Jan 16, 2026
881f2af
feat: Remove dead code
devanbenz Jan 16, 2026
5014c28
feat: errChan simplification
devanbenz Jan 16, 2026
fbb2aab
feat: Add segment name to error logs
devanbenz Jan 16, 2026
63b90d4
feat: pre-allocate in test
devanbenz Jan 16, 2026
2de8798
feat: one more segment.Flush()
devanbenz Jan 16, 2026
553e13d
feat: fix make for slices
devanbenz Jan 16, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/influx_inspect/verify/seriesfile/verify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func NewTest(t *testing.T) *Test {
}

// delete one series
if err := seriesFile.DeleteSeriesID(ids[0]); err != nil {
if _, err := seriesFile.DeleteSeriesID(ids[0], tsdb.Flush); err != nil {
return err
}

Expand Down
12 changes: 11 additions & 1 deletion tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1852,12 +1852,15 @@ func (e *Engine) deleteSeriesRange(seriesKeys [][]byte, min, max int64) error {
// Remove the remaining ids from the series file as they no longer exist
// in any shard.
var err error
var partitionIDs = make(map[int]struct{}, tsdb.SeriesFilePartitionN)
ids.ForEach(func(id uint64) {
name, tags := e.sfile.Series(id)
if err1 := e.sfile.DeleteSeriesID(id); err1 != nil {
part, err1 := e.sfile.DeleteSeriesID(id, tsdb.NoFlush)
if err1 != nil {
err = err1
return
}
partitionIDs[part.ID()] = struct{}{}

// In the case of the inmem index the series can be removed across
// the global index (all shards).
Expand All @@ -1871,6 +1874,13 @@ func (e *Engine) deleteSeriesRange(seriesKeys [][]byte, min, max int64) error {
if err != nil {
return err
}

if err := e.sfile.FlushSegments(partitionIDs); err != nil {
e.sfile.Logger.Error(
"error while flushing a series file segment",
zap.String("series_file_path", e.sfile.Path()),
zap.Error(err))
}
}

return nil
Expand Down
42 changes: 38 additions & 4 deletions tsdb/series_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ const SeriesIDSize = 8
const (
// SeriesFilePartitionN is the number of partitions a series file is split into.
SeriesFilePartitionN = 8
// Flush lets us know when to fsync
Flush = true
// NoFlush lets us know when to not fsync
NoFlush = false
)

// SeriesFile represents the section of the index that holds series data.
Expand Down Expand Up @@ -194,13 +198,43 @@ func (f *SeriesFile) CreateSeriesListIfNotExists(names [][]byte, tagsSlice []mod
}

// DeleteSeriesID flags a series as permanently deleted.
// If the series is reintroduced later then it must create a new id.
func (f *SeriesFile) DeleteSeriesID(id uint64) error {
// If the series is reintroduced later than it must create a new id.
// Setting flush will indicate whether this method triggers a fsync.
func (f *SeriesFile) DeleteSeriesID(id uint64, flush bool) (*SeriesPartition, error) {
p := f.SeriesIDPartition(id)
if p == nil {
return ErrInvalidSeriesPartitionID
return nil, ErrInvalidSeriesPartitionID
}
return p.DeleteSeriesID(id)
return p, p.DeleteSeriesID(id, flush)
}

func (f *SeriesFile) FlushSegments(partitionIDs map[int]struct{}) error {
var wg sync.WaitGroup
errCh := make(chan error, SeriesFilePartitionN)

for id := range partitionIDs {
wg.Add(1)
p := f.partitions[id]
go func() {
defer wg.Done()
p.mu.Lock()
defer p.mu.Unlock()
if segment := p.activeSegment(); segment != nil {
if err := segment.Flush(); err != nil {
errCh <- fmt.Errorf("unable to flush segment %s: %w", segment.file.Name(), err)
}
}
}()
}

wg.Wait()
close(errCh)

var errs = make([]error, 0, SeriesFilePartitionN)
for err := range errCh {
errs = append(errs, err)
}
return errors.Join(errs...)
}

// IsDeleted returns true if the ID has been deleted before.
Expand Down
109 changes: 81 additions & 28 deletions tsdb/series_file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/influxdata/influxdb/logger"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/tsdb"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)

Expand Down Expand Up @@ -191,38 +192,59 @@ func TestSeriesFileCompactor(t *testing.T) {

// Ensure series file deletions persist across compactions.
func TestSeriesFile_DeleteSeriesID(t *testing.T) {
sfile := MustOpenSeriesFile()
defer sfile.Close()

ids0, err := sfile.CreateSeriesListIfNotExists([][]byte{[]byte("m1")}, []models.Tags{nil}, tsdb.NoopStatsTracker())
if err != nil {
t.Fatal(err)
} else if _, err := sfile.CreateSeriesListIfNotExists([][]byte{[]byte("m2")}, []models.Tags{nil}, tsdb.NoopStatsTracker()); err != nil {
t.Fatal(err)
} else if err := sfile.ForceCompact(); err != nil {
t.Fatal(err)
deleteTestFn := func(flush bool) {
sfile := MustOpenSeriesFile()
defer func(sfile *SeriesFile) {
err := sfile.Close()
require.NoError(t, err, "close sfile")
}(sfile)

ids0, err := sfile.CreateSeriesListIfNotExists([][]byte{[]byte("m1")}, []models.Tags{nil}, tsdb.NoopStatsTracker())
require.NoError(t, err, "create series list")
_, err = sfile.CreateSeriesListIfNotExists([][]byte{[]byte("m2")}, []models.Tags{nil}, tsdb.NoopStatsTracker())
require.NoError(t, err, "create series list")
err = sfile.ForceCompact()
require.NoError(t, err, "force compact")

// Delete and ensure deletion.
_, err = sfile.DeleteSeriesID(ids0[0], flush)
require.NoError(t, err, "delete series list")
_, err = sfile.CreateSeriesListIfNotExists([][]byte{[]byte("m1")}, []models.Tags{nil}, tsdb.NoopStatsTracker())
require.NoError(t, err, "create series list")
require.True(t, sfile.IsDeleted(ids0[0]), "expected deleted")

err = sfile.ForceCompact()
require.NoError(t, err, "force compact")
require.True(t, sfile.IsDeleted(ids0[0]), "expected deleted")

err = sfile.Reopen()
require.NoError(t, err, "reopen")
require.True(t, sfile.IsDeleted(ids0[0]), "expected deleted")
}

// Delete and ensure deletion.
if err := sfile.DeleteSeriesID(ids0[0]); err != nil {
t.Fatal(err)
} else if _, err := sfile.CreateSeriesListIfNotExists([][]byte{[]byte("m1")}, []models.Tags{nil}, tsdb.NoopStatsTracker()); err != nil {
t.Fatal(err)
} else if !sfile.IsDeleted(ids0[0]) {
t.Fatal("expected deletion before compaction")
tests := []struct {
name string
fn func()
}{{
name: "delete series with flush",
fn: func() {
deleteTestFn(tsdb.Flush)
},
},
{
name: "delete series with no flush",
fn: func() {
deleteTestFn(tsdb.NoFlush)
},
},
}

if err := sfile.ForceCompact(); err != nil {
t.Fatal(err)
} else if !sfile.IsDeleted(ids0[0]) {
t.Fatal("expected deletion after compaction")
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.fn()
})
}

if err := sfile.Reopen(); err != nil {
t.Fatal(err)
} else if !sfile.IsDeleted(ids0[0]) {
t.Fatal("expected deletion after reopen")
}
}

func TestSeriesFile_Compaction(t *testing.T) {
Expand All @@ -246,7 +268,7 @@ func TestSeriesFile_Compaction(t *testing.T) {
// Delete a subset of keys.
for i, id := range ids {
if i%10 == 0 {
if err := sfile.DeleteSeriesID(id); err != nil {
if _, err := sfile.DeleteSeriesID(id, tsdb.Flush); err != nil {
t.Fatal(err)
}
}
Expand Down Expand Up @@ -325,7 +347,7 @@ func BenchmarkSeriesFile_Compaction(b *testing.B) {

// Delete a subset of keys.
for i := 0; i < len(ids); i += 10 {
if err := sfile.DeleteSeriesID(ids[i]); err != nil {
if _, err := sfile.DeleteSeriesID(ids[i], tsdb.Flush); err != nil {
b.Fatal(err)
}
}
Expand Down Expand Up @@ -353,6 +375,37 @@ func BenchmarkSeriesFile_Compaction(b *testing.B) {
}
}

func TestSeriesFile_FlushSegments(t *testing.T) {
const SeriesN = 100
sfile := MustOpenSeriesFile()
defer func() {
require.NoError(t, sfile.Close(), "series file close")
}()

// Create some series to ensure there's data in the segments.
var names = make([][]byte, 0, SeriesN)
var tagsSlice = make([]models.Tags, 0, SeriesN)
for i := 0; i < SeriesN; i++ {
names = append(names, []byte(fmt.Sprintf("measurement%d", i)))
tagsSlice = append(tagsSlice, models.NewTags(map[string]string{"tag": "value"}))
}
ids, err := sfile.CreateSeriesListIfNotExists(names, tagsSlice, tsdb.NoopStatsTracker())
require.NoError(t, err)

// Collect all partition IDs that have series.
partitionIDs := make(map[int]struct{}, tsdb.SeriesFilePartitionN)
for _, id := range ids {
partitionIDs[sfile.SeriesIDPartitionID(id)] = struct{}{}
}

// Flush the segments.
err = sfile.FlushSegments(partitionIDs)
require.NoError(t, err)

// Verify series still exist after flush.
require.Equal(t, uint64(SeriesN), sfile.SeriesCount())
}

// Series represents name/tagset pairs that are used in testing.
type Series struct {
Name []byte
Expand Down
15 changes: 9 additions & 6 deletions tsdb/series_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ func (p *SeriesPartition) CreateSeriesListIfNotExists(keys [][]byte, keyPartitio
// Flush active segment writes so we can access data in mmap.
if segment := p.activeSegment(); segment != nil {
if err := segment.Flush(); err != nil {
return err
return fmt.Errorf("unable to flush segment %s: %w", segment.file.Name(), err)
}
}

Expand Down Expand Up @@ -329,8 +329,9 @@ func (p *SeriesPartition) Compacting() bool {
}

// DeleteSeriesID flags a series as permanently deleted.
// If the series is reintroduced later then it must create a new id.
func (p *SeriesPartition) DeleteSeriesID(id uint64) error {
// If the series is reintroduced later than it must create a new id.
// Setting flush will indicate whether this method triggers a fsync.
func (p *SeriesPartition) DeleteSeriesID(id uint64, flush bool) error {
p.mu.Lock()
defer p.mu.Unlock()

Expand All @@ -350,9 +351,11 @@ func (p *SeriesPartition) DeleteSeriesID(id uint64) error {
}

// Flush active segment write.
if segment := p.activeSegment(); segment != nil {
if err := segment.Flush(); err != nil {
return err
if flush {
if segment := p.activeSegment(); segment != nil {
if err := segment.Flush(); err != nil {
return fmt.Errorf("unable to flush segment %s: %w", segment.file.Name(), err)
}
}
}

Expand Down
32 changes: 31 additions & 1 deletion tsdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1073,15 +1073,45 @@ func (s *Store) DeleteShard(shardID uint64) error {
}
}

const DeleteLogTrigger = 10_000
seriesCount := ss.Cardinality()
deleteStart := time.Now()
var deletedCount atomic.Uint64
var partitionIDs = make(map[int]struct{}, SeriesFilePartitionN)

ss.ForEach(func(id uint64) {
if err := sfile.DeleteSeriesID(id); err != nil {
p, err := sfile.DeleteSeriesID(id, NoFlush)
if err != nil {
sfile.Logger.Error(
"cannot delete series in shard",
zap.Uint64("series_id", id),
zap.Uint64("shard_id", shardID),
zap.String("series_file_path", sfile.Path()),
zap.Error(err))
} else {
partitionIDs[p.id] = struct{}{}
deleted := deletedCount.Add(1)

if deleted%DeleteLogTrigger == 0 {
s.Logger.Info(fmt.Sprintf("DeleteShard: %d series deleted", DeleteLogTrigger),
zap.String("db", db),
zap.Uint64("shard_id", shardID),
zap.String("series_file_path", sfile.Path()),
zap.Uint64("deleted", deleted),
zap.Uint64("remaining", seriesCount-deleted),
zap.Uint64("total", seriesCount),
zap.Duration("elapsed", time.Since(deleteStart)))
}
}
})

if err := sfile.FlushSegments(partitionIDs); err != nil {
sfile.Logger.Error(
"error while flushing a series file segment",
zap.Uint64("shard_id", shardID),
zap.String("series_file_path", sfile.Path()),
zap.Error(err))
}
}
}

Expand Down