diff --git a/tsdb/shard.go b/tsdb/shard.go index 50209fb53c9..1a2b29f9e86 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -102,15 +102,23 @@ func (e ShardError) Unwrap() error { // PartialWriteError indicates a write request could only write a portion of the // requested values. type PartialWriteError struct { - Reason string - Dropped int - + Reason string + Dropped int + Database string + RetentionPolicy string // A sorted slice of series keys that were dropped. DroppedKeys [][]byte } func (e PartialWriteError) Error() string { - return fmt.Sprintf("partial write: %s dropped=%d", e.Reason, e.Dropped) + message := fmt.Sprintf("partial write: %s dropped=%d", e.Reason, e.Dropped) + if len(e.Database) > 0 { + message = fmt.Sprintf("%s for database: %s", message, e.Database) + } + if len(e.RetentionPolicy) > 0 { + message = fmt.Sprintf("%s for retention policy: %s", message, e.RetentionPolicy) + } + return message } // Shard represents a self-contained time series database. An inverted index of diff --git a/v1/coordinator/points_writer.go b/v1/coordinator/points_writer.go index 4247054470f..fc891fc210c 100644 --- a/v1/coordinator/points_writer.go +++ b/v1/coordinator/points_writer.go @@ -75,21 +75,91 @@ func NewPointsWriter(writeTimeout time.Duration, path string) *PointsWriter { } } +type BoundType int + +const ( + WithinBounds BoundType = iota + RetentionPolicyBound + WriteWindowUpperBound + WriteWindowLowerBound + MaxBoundType // always the largest bound type, not for actual use +) + +func (b BoundType) String() string { + switch b { + case RetentionPolicyBound: + return "Retention Policy Lower Bound" + case WriteWindowUpperBound: + return "Write Window Upper Bound" + case WriteWindowLowerBound: + return "Write Window Lower Bound" + case WithinBounds: + return "Within Bounds" + default: + return "Unknown" + } +} + +type DroppedPoint struct { + Point models.Point + ViolatedBound time.Time + Reason BoundType +} + +func (d *DroppedPoint) String() string { + return fmt.Sprintf("point %s at %s dropped because it violates a %s at %s", d.Point.Key(), d.Point.Time().UTC().Format(time.RFC3339Nano), d.Reason.String(), d.ViolatedBound.UTC().Format(time.RFC3339Nano)) +} + // ShardMapping contains a mapping of shards to points. type ShardMapping struct { - n int - Points map[uint64][]models.Point // The points associated with a shard ID - Shards map[uint64]*meta.ShardInfo // The shards that have been mapped, keyed by shard ID - Dropped []models.Point // Points that were dropped + n int + Points map[uint64][]models.Point // The points associated with a shard ID + Shards map[uint64]*meta.ShardInfo // The shards that have been mapped, keyed by shard ID + MaxDropped DroppedPoint + MinDropped DroppedPoint + RetentionDropped int + WriteWindowDropped int + rpi *meta.RetentionPolicyInfo } // NewShardMapping creates an empty ShardMapping. -func NewShardMapping(n int) *ShardMapping { +func NewShardMapping(rpi *meta.RetentionPolicyInfo, n int) *ShardMapping { return &ShardMapping{ n: n, Points: map[uint64][]models.Point{}, Shards: map[uint64]*meta.ShardInfo{}, + rpi: rpi, + } +} + +func (s *ShardMapping) AddDropped(p models.Point, t time.Time, b BoundType) { + if s.MaxDropped.Point == nil || p.Time().After(s.MaxDropped.Point.Time()) { + s.MaxDropped = DroppedPoint{Point: p, ViolatedBound: t, Reason: b} } + if s.MinDropped.Point == nil || p.Time().Before(s.MinDropped.Point.Time()) { + s.MinDropped = DroppedPoint{Point: p, ViolatedBound: t, Reason: b} + } + switch b { + case RetentionPolicyBound: + s.RetentionDropped++ + case WriteWindowLowerBound, WriteWindowUpperBound: + s.WriteWindowDropped++ + } +} + +func (s *ShardMapping) Dropped() int { + return s.RetentionDropped + s.WriteWindowDropped +} + +func (s *ShardMapping) SummariseDropped() string { + if s.Dropped() <= 0 { + return "" + } + return fmt.Sprintf("dropped %d points outside retention policy of duration %s - oldest %s, newest %s", + s.RetentionDropped, + s.rpi.Duration.String(), + s.MinDropped.String(), + s.MaxDropped.String()) } // MapPoint adds the point to the ShardMapping, associated with the given shardInfo. @@ -247,13 +317,13 @@ func (w *PointsWriter) MapShards(wp *WritePointsRequest) (*ShardMapping, error) list.Add(*sg) } - mapping := NewShardMapping(len(wp.Points)) + mapping := NewShardMapping(rp, len(wp.Points)) for _, p := range wp.Points { sg := list.ShardGroupAt(p.Time()) if sg == nil { // We didn't create a shard group because the point was outside the - // scope of the RP. - mapping.Dropped = append(mapping.Dropped, p) + // scope of the RP + mapping.AddDropped(p, min, RetentionPolicyBound) continue } else if len(sg.Shards) <= 0 { // Shard groups should have at least one shard. @@ -361,7 +431,7 @@ func (w *PointsWriter) WritePoints( ctx context.Context, database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, - user meta.User, + _ meta.User, points []models.Point, ) error { return w.WritePointsPrivileged(ctx, database, retentionPolicy, consistencyLevel, points) @@ -406,12 +476,18 @@ func (w *PointsWriter) WritePointsPrivileged( }(shardMappings.Shards[shardID], database, retentionPolicy, points) } - if len(shardMappings.Dropped) > 0 { - w.stats.pointsWriteDropped.Observe(float64(len(shardMappings.Dropped))) - err = tsdb.PartialWriteError{Reason: "points beyond retention policy", Dropped: len(shardMappings.Dropped)} - } timeout := time.NewTimer(w.WriteTimeout) defer timeout.Stop() + + if err == nil && shardMappings.Dropped() > 0 { + w.stats.pointsWriteDropped.Observe(float64(shardMappings.Dropped())) + err = tsdb.PartialWriteError{Reason: shardMappings.SummariseDropped(), + Dropped: shardMappings.Dropped(), + Database: database, + RetentionPolicy: retentionPolicy, + } + } + for range shardMappings.Points { select { case <-w.closing: diff --git a/v1/coordinator/points_writer_test.go b/v1/coordinator/points_writer_test.go index 3a2d2728526..d81890c868a 100644 --- a/v1/coordinator/points_writer_test.go +++ b/v1/coordinator/points_writer_test.go @@ -2,7 +2,9 @@ package coordinator_test import ( "context" + "errors" "fmt" + "github.com/stretchr/testify/require" "sync" "sync/atomic" "testing" @@ -51,6 +53,47 @@ func TestPointsWriter_MapShards_One(t *testing.T) { } } +func MapPoints(t *testing.T, c *coordinator.PointsWriter, pr *coordinator.WritePointsRequest, values []float64, droppedCount int, minDropped *coordinator.DroppedPoint, maxDropped *coordinator.DroppedPoint, summary string) { + var ( + shardMappings *coordinator.ShardMapping + err error + ) + if shardMappings, err = c.MapShards(pr); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if exp := 1; len(shardMappings.Points) != exp { + t.Errorf("MapShards() len mismatch. got %v, exp %v", len(shardMappings.Points), exp) + } + + p := func() []models.Point { + for _, v := range shardMappings.Points { + return v + } + return nil + }() + verify := + func(p []models.Point, values []float64) { + require.Equal(t, len(values), len(p), "unexpected number of points") + for i, expV := range values { + f, err := p[i].Fields() + require.NoError(t, err, "error retrieving fields") + v, ok := f["value"] + require.True(t, ok, "\"value\" field not found") + require.Equal(t, expV, v, "unexpected value") + } + } + verify(p, values) + require.Equal(t, shardMappings.Dropped(), droppedCount, "wrong number of points dropped") + if shardMappings.Dropped() > 0 { + require.Equal(t, minDropped.Point, shardMappings.MinDropped.Point, "minimum dropped point mismatch") + require.Equal(t, minDropped.Reason, shardMappings.MinDropped.Reason, "minimum dropped reason mismatch") + require.Equal(t, maxDropped.Point, shardMappings.MaxDropped.Point, "maximum dropped point mismatch") + require.Equal(t, maxDropped.Reason, shardMappings.MaxDropped.Reason, "maximum dropped reason mismatch") + require.Contains(t, shardMappings.SummariseDropped(), summary, "summary mismatch") + } +} + // Ensures the points writer maps to a new shard group when the shard duration // is changed. func TestPointsWriter_MapShards_AlterShardDuration(t *testing.T) { @@ -239,9 +282,11 @@ func TestPointsWriter_MapShards_Invalid(t *testing.T) { t.Errorf("MapShards() len mismatch. got %v, exp %v", got, exp) } - if got, exp := len(shardMappings.Dropped), 1; got != exp { + if got, exp := shardMappings.RetentionDropped, 1; got != exp { t.Fatalf("MapShard() dropped mismatch: got %v, exp %v", got, exp) } + + require.Equal(t, coordinator.RetentionPolicyBound, shardMappings.MinDropped.Reason, "unexpected reason for dropped point") } func TestPointsWriter_WritePoints(t *testing.T) { @@ -288,7 +333,7 @@ func TestPointsWriter_WritePoints(t *testing.T) { pr.AddPoint("cpu", 3.0, time.Now().Add(time.Hour+time.Second), nil) // copy to prevent data race - sm := coordinator.NewShardMapping(16) + sm := coordinator.NewShardMapping(nil, 16) sm.MapPoint( &meta.ShardInfo{ID: uint64(1), Owners: []meta.ShardOwner{ {NodeID: 1}, @@ -360,16 +405,9 @@ func TestPointsWriter_WritePoints_Dropped(t *testing.T) { // are created. ms := NewPointsWriterMetaClient() - // Three points that range over the shardGroup duration (1h) and should map to two - // distinct shards + // Add a point earlier than the retention period pr.AddPoint("cpu", 1.0, time.Now().Add(-24*time.Hour), nil) - // copy to prevent data race - sm := coordinator.NewShardMapping(16) - - // ShardMapper dropped this point - sm.Dropped = append(sm.Dropped, pr.Points[0]) - // Local coordinator.Node ShardWriter // lock on the write increment since these functions get called in parallel var mu sync.Mutex @@ -392,13 +430,20 @@ func TestPointsWriter_WritePoints_Dropped(t *testing.T) { c.TSDBStore = store c.Node = &influxdb.Node{ID: 1} - c.Open() - defer c.Close() + require.NoError(t, c.Open(), "failure opening PointsWriter") + defer func(pw *coordinator.PointsWriter) { + require.NoError(t, pw.Close(), "failure closing PointsWriter") + }(c) err := c.WritePointsPrivileged(context.Background(), pr.Database, pr.RetentionPolicy, models.ConsistencyLevelOne, pr.Points) - if _, ok := err.(tsdb.PartialWriteError); !ok { + require.Error(t, err, "unexpected success writing points") + var pwErr tsdb.PartialWriteError + if !errors.As(err, &pwErr) { t.Errorf("PointsWriter.WritePoints(): got %v, exp %v", err, tsdb.PartialWriteError{}) } + require.Equal(t, 1, pwErr.Dropped, "wrong number of points dropped") + require.ErrorContains(t, pwErr, "partial write: dropped 1 points outside retention policy of duration 1h0m0s") + require.ErrorContains(t, pwErr, "Retention Policy Lower Bound") } var shardID uint64