Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/influxd/run/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ func (s *Server) appendRetentionPolicyService(c retention.Config) {
return
}
srv := retention.NewService(c)
srv.MetaClient = s.MetaClient
srv.SetOSSMetaClient(s.MetaClient)
srv.TSDBStore = s.TSDBStore
srv.DropShardMetaRef = retention.OSSDropShardMetaRef(s.MetaClient)
s.Services = append(s.Services, srv)
Expand Down
67 changes: 55 additions & 12 deletions services/retention/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ package retention // import "github.com/influxdata/influxdb/services/retention"
import (
"errors"
"fmt"
"math"
"slices"
"sync"
"time"

Expand All @@ -13,13 +15,37 @@ import (
"go.uber.org/zap"
)

type MetaClient interface {
type OSSMetaClient interface {
Databases() []meta.DatabaseInfo
DeleteShardGroup(database, policy string, id uint64) error
DropShard(id uint64) error
PruneShardGroups() error
}

type MetaClient interface {
OSSMetaClient
NodeID() uint64
}

const (
// ossNodeID is a special node ID for OSS nodes. No enterprise node will ever have this node ID.
// 0 can not be used because there is a brief period on startup before meta-client initialization
// and before joining a cluster that NodeID() == 0.
ossNodeID uint64 = math.MaxUint64
)

// ossMetaClientAdapter adds methods the retention service needs to the OSS meta.Client implementation.
// OSSMetaClient is decorated with methods needed for the Enterprise retention service instead of adding
// them to the OSS MetaClient to avoid polluting the OSS MetaClient namespace.
type ossMetaClientAdapter struct {
OSSMetaClient
}

// NodeID returns the magic ossNodeID identifier.
func (c *ossMetaClientAdapter) NodeID() uint64 {
return ossNodeID
}

// Service represents the retention policy enforcement service.
type Service struct {
MetaClient
Expand Down Expand Up @@ -58,12 +84,20 @@ func NewService(c Config) *Service {
}

// OSSDropShardMetaRef creates a closure appropriate for OSS to use as DropShardMetaRef.
func OSSDropShardMetaRef(mc MetaClient) func(uint64, []uint64) error {
func OSSDropShardMetaRef(mc OSSMetaClient) func(uint64, []uint64) error {
return func(shardID uint64, owners []uint64) error {
return mc.DropShard(shardID)
}
}

func (s *Service) SetMetaClient(c MetaClient) {
s.MetaClient = c
}

func (s *Service) SetOSSMetaClient(c OSSMetaClient) {
s.SetMetaClient(&ossMetaClientAdapter{OSSMetaClient: c})
}

// Open starts retention policy enforcement.
func (s *Service) Open() error {
if !s.config.Enabled || s.done != nil {
Expand Down Expand Up @@ -116,6 +150,10 @@ func (s *Service) run() {
}
}

func (s *Service) isOSS() bool {
return s.NodeID() == ossNodeID
}

func (s *Service) DeletionCheck() {
log, logEnd := logger.NewOperation(s.logger, "Retention policy deletion check", "retention_delete_check")
defer logEnd()
Expand Down Expand Up @@ -244,16 +282,21 @@ func (s *Service) DeletionCheck() {

// Check for expired phantom shards that exist in the metadata but not in the store.
for id, info := range deletedShardIDs {
func() {
log, logEnd := logger.NewOperation(log, "Drop phantom shard references", "retention_drop_phantom_refs",
logger.Database(info.db), logger.Shard(id), logger.RetentionPolicy(info.rp), zap.Uint64s("owners", info.owners))
defer logEnd()
log.Warn("Expired phantom shard detected during retention check, removing from metadata")
if err := s.DropShardMetaRef(id, info.owners); err != nil {
log.Error("Error dropping shard meta reference for phantom shard", zap.Error(err))
retryNeeded = true
}
}()
// Enterprise tracks shard ownership while OSS does not because it is single node. A shard not in the
// TSDB but in the metadata is always a phantom shard for OSS. For enterprise, it is only a phantom shard
// if this node is supposed to own the shard according to the metadata.
if s.isOSS() || slices.Contains(info.owners, s.NodeID()) {
func() {
log, logEnd := logger.NewOperation(log, "Drop phantom shard references", "retention_drop_phantom_refs",
logger.Database(info.db), logger.Shard(id), logger.RetentionPolicy(info.rp), zap.Uint64s("owners", info.owners))
defer logEnd()
log.Warn("Expired phantom shard detected during retention check, removing from metadata")
if err := s.DropShardMetaRef(id, info.owners); err != nil {
log.Error("Error dropping shard meta reference for phantom shard", zap.Error(err))
retryNeeded = true
}
}()
}
}

func() {
Expand Down
4 changes: 2 additions & 2 deletions services/retention/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ func TestRetention_DeletionCheck(t *testing.T) {
}

s := retention.NewService(cfg)
s.MetaClient = mc
s.SetOSSMetaClient(mc)
s.TSDBStore = store
s.DropShardMetaRef = retention.OSSDropShardMetaRef(s.MetaClient)
require.NoError(t, s.Open())
Expand Down Expand Up @@ -721,7 +721,7 @@ func NewService(c retention.Config) *Service {
l := logger.New(&s.LogBuf)
s.WithLogger(l)

s.Service.MetaClient = s.MetaClient
s.Service.SetOSSMetaClient(s.MetaClient)
s.Service.TSDBStore = s.TSDBStore
s.Service.DropShardMetaRef = retention.OSSDropShardMetaRef(s.Service.MetaClient)
return s
Expand Down