diff --git a/coordinator/points_writer.go b/coordinator/points_writer.go index 55fddc4ddbe..2f9cb71e242 100644 --- a/coordinator/points_writer.go +++ b/coordinator/points_writer.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "sort" + "strings" "sync" "sync/atomic" "time" @@ -95,23 +96,109 @@ func NewPointsWriter() *PointsWriter { } } +type BoundType int + +const ( + WithinBounds BoundType = iota + RetentionPolicyBound + WriteWindowUpperBound + WriteWindowLowerBound + MaxBoundType // always the largest bound type, not for actual use +) + +func (b BoundType) String() string { + switch b { + case RetentionPolicyBound: + return "Retention Policy Lower Bound" + case WriteWindowUpperBound: + return "Write Window Upper Bound" + case WriteWindowLowerBound: + return "Write Window Lower Bound" + case WithinBounds: + return "Within Bounds" + default: + return "Unknown" + } +} + +type DroppedPoint struct { + Point models.Point + ViolatedBound time.Time + Reason BoundType +} + +func (d *DroppedPoint) String() string { + return fmt.Sprintf("point %s at %s dropped because it violates a %s at %s", d.Point.Key(), d.Point.Time().UTC().Format(time.RFC3339Nano), d.Reason.String(), d.ViolatedBound.UTC().Format(time.RFC3339Nano)) +} + // ShardMapping contains a mapping of shards to points. type ShardMapping struct { - n int - Points map[uint64][]models.Point // The points associated with a shard ID - Shards map[uint64]*meta.ShardInfo // The shards that have been mapped, keyed by shard ID - Dropped []models.Point // Points that were dropped + n int + Points map[uint64][]models.Point // The points associated with a shard ID + Shards map[uint64]*meta.ShardInfo // The shards that have been mapped, keyed by shard ID + MaxDropped DroppedPoint + MinDropped DroppedPoint + RetentionDropped int + WriteWindowDropped int + rpi *meta.RetentionPolicyInfo } // NewShardMapping creates an empty ShardMapping. -func NewShardMapping(n int) *ShardMapping { +func NewShardMapping(rpi *meta.RetentionPolicyInfo, n int) *ShardMapping { return &ShardMapping{ n: n, Points: map[uint64][]models.Point{}, Shards: map[uint64]*meta.ShardInfo{}, + rpi: rpi, } } +func (s *ShardMapping) AddDropped(p models.Point, t time.Time, b BoundType) { + if s.MaxDropped.Point == nil || p.Time().After(s.MaxDropped.Point.Time()) { + s.MaxDropped = DroppedPoint{Point: p, ViolatedBound: t, Reason: b} + } + if s.MinDropped.Point == nil || p.Time().Before(s.MinDropped.Point.Time()) { + s.MinDropped = DroppedPoint{Point: p, ViolatedBound: t, Reason: b} + } + switch b { + case RetentionPolicyBound: + s.RetentionDropped++ + case WriteWindowLowerBound, WriteWindowUpperBound: + s.WriteWindowDropped++ + } +} + +func (s *ShardMapping) Dropped() int { + return s.RetentionDropped + s.WriteWindowDropped +} + +func (s *ShardMapping) SummariseDropped() string { + if s.Dropped() <= 0 { + return "" + } + summary := strings.Builder{} + if s.rpi.PastWriteLimit > 0 || s.rpi.FutureWriteLimit > 0 { + summary.WriteString(fmt.Sprintf(" and %d points outside write window (", s.WriteWindowDropped)) + if s.rpi.PastWriteLimit > 0 { + summary.WriteString("-") + summary.WriteString(s.rpi.PastWriteLimit.String()) + } + if s.rpi.FutureWriteLimit > 0 { + if s.rpi.PastWriteLimit > 0 { + summary.WriteString(" to ") + } + summary.WriteString(s.rpi.FutureWriteLimit.String()) + } + summary.WriteString(")") + } + return fmt.Sprintf("dropped %d points outside retention policy of duration %s%s - oldest %s, newest %s", + s.RetentionDropped, + s.rpi.Duration.String(), + summary.String(), + s.MinDropped.String(), + s.MaxDropped.String()) +} + // MapPoint adds the point to the ShardMapping, associated with the given shardInfo. func (s *ShardMapping) MapPoint(shardInfo *meta.ShardInfo, p models.Point) { if cap(s.Points[shardInfo.ID]) < s.n { @@ -147,14 +234,14 @@ func NewWriteWindow(rp *meta.RetentionPolicyInfo) *WriteWindow { return w } -func (w *WriteWindow) WithinWindow(t time.Time) bool { +func (w *WriteWindow) WithinWindow(t time.Time) (bool, time.Time, BoundType) { if w.checkBefore && t.Before(w.before) { - return false + return false, w.before, WriteWindowLowerBound } if w.checkAfter && t.After(w.after) { - return false + return false, w.after, WriteWindowUpperBound } - return true + return true, time.Time{}, WithinBounds } // Open opens the communication channel with the point writer. @@ -229,7 +316,8 @@ func (w *PointsWriter) MapShards(wp *WritePointsRequest) (*ShardMapping, error) // Either the point is outside the scope of the RP, we already have // a suitable shard group for the point, or it is outside the write window // for the RP, and we don't want to unnecessarily create a shard for it - if p.Time().Before(min) || list.Covers(p.Time()) || !ww.WithinWindow(p.Time()) { + withinWindow, _, _ := ww.WithinWindow(p.Time()) + if p.Time().Before(min) || list.Covers(p.Time()) || !withinWindow { continue } @@ -246,13 +334,18 @@ func (w *PointsWriter) MapShards(wp *WritePointsRequest) (*ShardMapping, error) list.Add(*sg) } - mapping := NewShardMapping(len(wp.Points)) + mapping := NewShardMapping(rp, len(wp.Points)) for _, p := range wp.Points { sg := list.ShardGroupAt(p.Time()) - if sg == nil || !ww.WithinWindow(p.Time()) { + if sg == nil { // We didn't create a shard group because the point was outside the - // scope of the RP, or the point is outside the write window for the RP. - mapping.Dropped = append(mapping.Dropped, p) + // scope of the RP + mapping.AddDropped(p, min, RetentionPolicyBound) + atomic.AddInt64(&w.stats.WriteDropped, 1) + continue + } else if withinWindow, bound, reason := ww.WithinWindow(p.Time()); !withinWindow { + // The point is outside the write window for the RP. + mapping.AddDropped(p, bound, reason) atomic.AddInt64(&w.stats.WriteDropped, 1) continue } else if len(sg.Shards) <= 0 { @@ -420,9 +513,9 @@ func (w *PointsWriter) WritePointsPrivileged(writeCtx tsdb.WriteContext, databas w.Subscriber.Send(pts) atomic.AddInt64(&w.stats.SubWriteOK, 1) - if err == nil && len(shardMappings.Dropped) > 0 { - err = tsdb.PartialWriteError{Reason: "points beyond retention policy or outside permissible write window", - Dropped: len(shardMappings.Dropped), + if err == nil && shardMappings.Dropped() > 0 { + err = tsdb.PartialWriteError{Reason: shardMappings.SummariseDropped(), + Dropped: shardMappings.Dropped(), Database: database, RetentionPolicy: retentionPolicy, } diff --git a/coordinator/points_writer_test.go b/coordinator/points_writer_test.go index 31119fd91d2..92bf1e3ae15 100644 --- a/coordinator/points_writer_test.go +++ b/coordinator/points_writer_test.go @@ -1,6 +1,7 @@ package coordinator_test import ( + "errors" "fmt" "reflect" "sync" @@ -89,9 +90,11 @@ func TestPointsWriter_MapShards_WriteLimits(t *testing.T) { pr.AddPoint("cpu", -2.0, time.Now().Add(-time.Minute*20), nil) values := []float64{0.0, 1.0, -1.0} - dropped := []float64{2.0, -2.0} - MapPoints(t, c, pr, values, dropped) + MapPoints(t, c, pr, values, 2, + &coordinator.DroppedPoint{Point: pr.Points[4], Reason: coordinator.WriteWindowLowerBound}, + &coordinator.DroppedPoint{Point: pr.Points[2], Reason: coordinator.WriteWindowUpperBound}, + "dropped 0 points outside retention policy of duration 3h0m0s and 2 points outside write window (-10m0s to 15m0s) -") // Clear the write limits by setting them to zero // No points should be dropped @@ -106,11 +109,28 @@ func TestPointsWriter_MapShards_WriteLimits(t *testing.T) { } require.NoError(t, meta.ApplyRetentionUpdate(rpu, rp), "ApplyRetentionUpdate failed") values = []float64{0.0, 1.0, 2.0, -1.0, -2.0} - dropped = []float64{} - MapPoints(t, c, pr, values, dropped) + MapPoints(t, c, pr, values, 0, nil, nil, "dropped 0 points outside retention policy of duration 3h0m0s -") + + rpu.SetFutureWriteLimit(futureWriteLimit) + require.NoError(t, meta.ApplyRetentionUpdate(rpu, rp), "ApplyRetentionUpdate failed") + values = []float64{0.0, 1.0, -1.0, -2.0} + MapPoints(t, c, pr, values, 1, + &coordinator.DroppedPoint{Point: pr.Points[2], Reason: coordinator.WriteWindowUpperBound}, + &coordinator.DroppedPoint{Point: pr.Points[2], Reason: coordinator.WriteWindowUpperBound}, + "dropped 0 points outside retention policy of duration 3h0m0s and 1 points outside write window (15m0s) -") + + rpu.SetFutureWriteLimit(zeroDuration) + rpu.SetPastWriteLimit(pastWriteLimit) + require.NoError(t, meta.ApplyRetentionUpdate(rpu, rp), "ApplyRetentionUpdate failed") + values = []float64{0.0, 1.0, 2.0, -1.0} + MapPoints(t, c, pr, values, 1, + &coordinator.DroppedPoint{Point: pr.Points[4], Reason: coordinator.WriteWindowLowerBound}, + &coordinator.DroppedPoint{Point: pr.Points[4], Reason: coordinator.WriteWindowLowerBound}, + "dropped 0 points outside retention policy of duration 3h0m0s and 1 points outside write window (-10m0s) -") + } -func MapPoints(t *testing.T, c *coordinator.PointsWriter, pr *coordinator.WritePointsRequest, values []float64, dropped []float64) { +func MapPoints(t *testing.T, c *coordinator.PointsWriter, pr *coordinator.WritePointsRequest, values []float64, droppedCount int, minDropped *coordinator.DroppedPoint, maxDropped *coordinator.DroppedPoint, summary string) { var ( shardMappings *coordinator.ShardMapping err error @@ -141,7 +161,14 @@ func MapPoints(t *testing.T, c *coordinator.PointsWriter, pr *coordinator.WriteP } } verify(p, values) - verify(shardMappings.Dropped, dropped) + require.Equal(t, shardMappings.Dropped(), droppedCount, "wrong number of points dropped") + if shardMappings.Dropped() > 0 { + require.Equal(t, minDropped.Point, shardMappings.MinDropped.Point, "minimum dropped point mismatch") + require.Equal(t, minDropped.Reason, shardMappings.MinDropped.Reason, "minimum dropped reason mismatch") + require.Equal(t, maxDropped.Point, shardMappings.MaxDropped.Point, "maximum dropped point mismatch") + require.Equal(t, maxDropped.Reason, shardMappings.MaxDropped.Reason, "maximum dropped reason mismatch") + require.Contains(t, shardMappings.SummariseDropped(), summary, "summary mismatch") + } } // Ensures the points writer maps to a new shard group when the shard duration @@ -332,9 +359,11 @@ func TestPointsWriter_MapShards_Invalid(t *testing.T) { t.Errorf("MapShards() len mismatch. got %v, exp %v", got, exp) } - if got, exp := len(shardMappings.Dropped), 1; got != exp { + if got, exp := shardMappings.RetentionDropped, 1; got != exp { t.Fatalf("MapShard() dropped mismatch: got %v, exp %v", got, exp) } + + require.Equal(t, coordinator.RetentionPolicyBound, shardMappings.MinDropped.Reason, "unexpected reason for dropped point") } func TestPointsWriter_WritePoints(t *testing.T) { @@ -384,7 +413,7 @@ func TestPointsWriter_WritePoints(t *testing.T) { // copy to prevent data race theTest := test - sm := coordinator.NewShardMapping(16) + sm := coordinator.NewShardMapping(nil, 16) sm.MapPoint(&meta.ShardInfo{ID: uint64(1), Owners: []meta.ShardOwner{ {NodeID: 1}, {NodeID: 2}, @@ -467,16 +496,9 @@ func TestPointsWriter_WritePoints_Dropped(t *testing.T) { // are created. ms := NewPointsWriterMetaClient() - // Three points that range over the shardGroup duration (1h) and should map to two - // distinct shards + // Add a point earlier than the retention period pr.AddPoint("cpu", 1.0, time.Now().Add(-24*time.Hour), nil) - // copy to prevent data race - sm := coordinator.NewShardMapping(16) - - // ShardMapper dropped this point - sm.Dropped = append(sm.Dropped, pr.Points[0]) - // Local coordinator.Node ShardWriter // lock on the write increment since these functions get called in parallel var mu sync.Mutex @@ -506,13 +528,20 @@ func TestPointsWriter_WritePoints_Dropped(t *testing.T) { c.Subscriber = sub c.Node = &influxdb.Node{ID: 1} - c.Open() - defer c.Close() + require.NoError(t, c.Open(), "failure opening PointsWriter") + defer func(pw *coordinator.PointsWriter) { + require.NoError(t, pw.Close(), "failure closing PointsWriter") + }(c) err := c.WritePointsPrivileged(tsdb.WriteContext{}, pr.Database, pr.RetentionPolicy, models.ConsistencyLevelOne, pr.Points) - if _, ok := err.(tsdb.PartialWriteError); !ok { + require.Error(t, err, "unexpected success writing points") + var pwErr tsdb.PartialWriteError + if !errors.As(err, &pwErr) { t.Errorf("PointsWriter.WritePoints(): got %v, exp %v", err, tsdb.PartialWriteError{}) } + require.Equal(t, 1, pwErr.Dropped, "wrong number of points dropped") + require.ErrorContains(t, pwErr, "partial write: dropped 1 points outside retention policy of duration 1h0m0s") + require.ErrorContains(t, pwErr, "Retention Policy Lower Bound") } type fakePointsWriter struct {