From 053827331574be432a81f2905b5db35b56b469e7 Mon Sep 17 00:00:00 2001 From: davidby-influx Date: Fri, 11 Apr 2025 17:46:13 -0700 Subject: [PATCH 1/5] feat: improve dropped point logging Log the reason for a point being dropped, the type of boundary violated, and the time that was the boundary. Prints the maximum and minimum points (by time) that were dropped closes https://github.com/influxdata/influxdb/issues/26252 --- coordinator/points_writer.go | 92 ++++++++++++++++++++++++++----- coordinator/points_writer_test.go | 22 +++++--- 2 files changed, 91 insertions(+), 23 deletions(-) diff --git a/coordinator/points_writer.go b/coordinator/points_writer.go index 55fddc4ddbe..354de36aba4 100644 --- a/coordinator/points_writer.go +++ b/coordinator/points_writer.go @@ -95,12 +95,48 @@ func NewPointsWriter() *PointsWriter { } } +type BoundType int + +const ( + WithinBounds BoundType = iota + RetentionPolicyBound + WriteWindowUpperBound + WriteWindowLowerBound +) + +func (b BoundType) String() string { + switch b { + case RetentionPolicyBound: + return "Retention Policy Lower Bound" + case WriteWindowUpperBound: + return "Write Window Upper Bound" + case WriteWindowLowerBound: + return "Write Window Lower Bound" + case WithinBounds: + return "Within Bounds" + default: + return "Unknown" + } +} + +type DroppedPoint struct { + Point models.Point + ViolatedBound time.Time + Reason BoundType +} + +func (d *DroppedPoint) String() string { + return fmt.Sprintf("point %s at %s dropped because it violates a %s at %s", d.Point.Key(), d.Point.Time(), d.Reason.String(), d.ViolatedBound) +} + // ShardMapping contains a mapping of shards to points. type ShardMapping struct { - n int - Points map[uint64][]models.Point // The points associated with a shard ID - Shards map[uint64]*meta.ShardInfo // The shards that have been mapped, keyed by shard ID - Dropped []models.Point // Points that were dropped + n int + Points map[uint64][]models.Point // The points associated with a shard ID + Shards map[uint64]*meta.ShardInfo // The shards that have been mapped, keyed by shard ID + MaxDropped DroppedPoint + MinDropped DroppedPoint + CountDropped int } // NewShardMapping creates an empty ShardMapping. @@ -112,6 +148,26 @@ func NewShardMapping(n int) *ShardMapping { } } +func (s *ShardMapping) AddDropped(p models.Point, t time.Time, b BoundType) { + if s.MaxDropped.Point == nil || p.Time().After(s.MaxDropped.Point.Time()) { + s.MaxDropped = DroppedPoint{Point: p, ViolatedBound: t, Reason: b} + } + if s.MinDropped.Point == nil || p.Time().Before(s.MinDropped.Point.Time()) { + s.MinDropped = DroppedPoint{Point: p, ViolatedBound: t, Reason: b} + } + s.CountDropped++ +} + +func (s *ShardMapping) SummariseDropped() string { + if s.CountDropped == 0 { + return "" + } + return fmt.Sprintf("dropped %d points outside retention policy or write window: %s to %s", + s.CountDropped, + s.MinDropped.String(), + s.MaxDropped.String()) +} + // MapPoint adds the point to the ShardMapping, associated with the given shardInfo. func (s *ShardMapping) MapPoint(shardInfo *meta.ShardInfo, p models.Point) { if cap(s.Points[shardInfo.ID]) < s.n { @@ -147,14 +203,14 @@ func NewWriteWindow(rp *meta.RetentionPolicyInfo) *WriteWindow { return w } -func (w *WriteWindow) WithinWindow(t time.Time) bool { +func (w *WriteWindow) WithinWindow(t time.Time) (bool, time.Time, BoundType) { if w.checkBefore && t.Before(w.before) { - return false + return false, w.before, WriteWindowLowerBound } if w.checkAfter && t.After(w.after) { - return false + return false, w.after, WriteWindowUpperBound } - return true + return true, time.Time{}, WithinBounds } // Open opens the communication channel with the point writer. @@ -229,7 +285,8 @@ func (w *PointsWriter) MapShards(wp *WritePointsRequest) (*ShardMapping, error) // Either the point is outside the scope of the RP, we already have // a suitable shard group for the point, or it is outside the write window // for the RP, and we don't want to unnecessarily create a shard for it - if p.Time().Before(min) || list.Covers(p.Time()) || !ww.WithinWindow(p.Time()) { + withinWindow, _, _ := ww.WithinWindow(p.Time()) + if p.Time().Before(min) || list.Covers(p.Time()) || !withinWindow { continue } @@ -249,10 +306,15 @@ func (w *PointsWriter) MapShards(wp *WritePointsRequest) (*ShardMapping, error) mapping := NewShardMapping(len(wp.Points)) for _, p := range wp.Points { sg := list.ShardGroupAt(p.Time()) - if sg == nil || !ww.WithinWindow(p.Time()) { + if sg == nil { // We didn't create a shard group because the point was outside the - // scope of the RP, or the point is outside the write window for the RP. - mapping.Dropped = append(mapping.Dropped, p) + // scope of the RP + mapping.AddDropped(p, min, RetentionPolicyBound) + atomic.AddInt64(&w.stats.WriteDropped, 1) + continue + } else if withinWindow, bound, reason := ww.WithinWindow(p.Time()); !withinWindow { + // The point is outside the write window for the RP. + mapping.AddDropped(p, bound, reason) atomic.AddInt64(&w.stats.WriteDropped, 1) continue } else if len(sg.Shards) <= 0 { @@ -420,9 +482,9 @@ func (w *PointsWriter) WritePointsPrivileged(writeCtx tsdb.WriteContext, databas w.Subscriber.Send(pts) atomic.AddInt64(&w.stats.SubWriteOK, 1) - if err == nil && len(shardMappings.Dropped) > 0 { - err = tsdb.PartialWriteError{Reason: "points beyond retention policy or outside permissible write window", - Dropped: len(shardMappings.Dropped), + if err == nil && shardMappings.CountDropped > 0 { + err = tsdb.PartialWriteError{Reason: shardMappings.SummariseDropped(), + Dropped: shardMappings.CountDropped, Database: database, RetentionPolicy: retentionPolicy, } diff --git a/coordinator/points_writer_test.go b/coordinator/points_writer_test.go index 31119fd91d2..7ae6ccadc81 100644 --- a/coordinator/points_writer_test.go +++ b/coordinator/points_writer_test.go @@ -89,9 +89,10 @@ func TestPointsWriter_MapShards_WriteLimits(t *testing.T) { pr.AddPoint("cpu", -2.0, time.Now().Add(-time.Minute*20), nil) values := []float64{0.0, 1.0, -1.0} - dropped := []float64{2.0, -2.0} - MapPoints(t, c, pr, values, dropped) + MapPoints(t, c, pr, values, 2, + &coordinator.DroppedPoint{Point: pr.Points[4], Reason: coordinator.WriteWindowLowerBound}, + &coordinator.DroppedPoint{Point: pr.Points[2], Reason: coordinator.WriteWindowUpperBound}) // Clear the write limits by setting them to zero // No points should be dropped @@ -106,11 +107,10 @@ func TestPointsWriter_MapShards_WriteLimits(t *testing.T) { } require.NoError(t, meta.ApplyRetentionUpdate(rpu, rp), "ApplyRetentionUpdate failed") values = []float64{0.0, 1.0, 2.0, -1.0, -2.0} - dropped = []float64{} - MapPoints(t, c, pr, values, dropped) + MapPoints(t, c, pr, values, 0, nil, nil) } -func MapPoints(t *testing.T, c *coordinator.PointsWriter, pr *coordinator.WritePointsRequest, values []float64, dropped []float64) { +func MapPoints(t *testing.T, c *coordinator.PointsWriter, pr *coordinator.WritePointsRequest, values []float64, droppedCount int, minDropped *coordinator.DroppedPoint, maxDropped *coordinator.DroppedPoint) { var ( shardMappings *coordinator.ShardMapping err error @@ -141,7 +141,13 @@ func MapPoints(t *testing.T, c *coordinator.PointsWriter, pr *coordinator.WriteP } } verify(p, values) - verify(shardMappings.Dropped, dropped) + require.Equal(t, shardMappings.CountDropped, droppedCount, "wrong number of points dropped") + if shardMappings.CountDropped > 0 { + require.Equal(t, minDropped.Point, shardMappings.MinDropped.Point, "minimum dropped point mismatch") + require.Equal(t, minDropped.Reason, shardMappings.MinDropped.Reason, "minimum dropped reason mismatch") + require.Equal(t, maxDropped.Point, shardMappings.MaxDropped.Point, "maximum dropped point mismatch") + require.Equal(t, maxDropped.Reason, shardMappings.MaxDropped.Reason, "maximum dropped reason mismatch") + } } // Ensures the points writer maps to a new shard group when the shard duration @@ -332,7 +338,7 @@ func TestPointsWriter_MapShards_Invalid(t *testing.T) { t.Errorf("MapShards() len mismatch. got %v, exp %v", got, exp) } - if got, exp := len(shardMappings.Dropped), 1; got != exp { + if got, exp := shardMappings.CountDropped, 1; got != exp { t.Fatalf("MapShard() dropped mismatch: got %v, exp %v", got, exp) } } @@ -475,7 +481,7 @@ func TestPointsWriter_WritePoints_Dropped(t *testing.T) { sm := coordinator.NewShardMapping(16) // ShardMapper dropped this point - sm.Dropped = append(sm.Dropped, pr.Points[0]) + sm.AddDropped(pr.Points[0], time.Time{}, coordinator.RetentionPolicyBound) // Local coordinator.Node ShardWriter // lock on the write increment since these functions get called in parallel From 3741b160e2e7ca5e313f2b727c8d5a1949406249 Mon Sep 17 00:00:00 2001 From: davidby-influx Date: Mon, 14 Apr 2025 13:39:37 -0700 Subject: [PATCH 2/5] fix: better time formatting and additional testing --- coordinator/points_writer.go | 2 +- coordinator/points_writer_test.go | 25 ++++++++++++++----------- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/coordinator/points_writer.go b/coordinator/points_writer.go index 354de36aba4..3b7ffc18f8f 100644 --- a/coordinator/points_writer.go +++ b/coordinator/points_writer.go @@ -126,7 +126,7 @@ type DroppedPoint struct { } func (d *DroppedPoint) String() string { - return fmt.Sprintf("point %s at %s dropped because it violates a %s at %s", d.Point.Key(), d.Point.Time(), d.Reason.String(), d.ViolatedBound) + return fmt.Sprintf("point %s at %s dropped because it violates a %s at %s", d.Point.Key(), d.Point.Time().UTC().Format(time.RFC3339Nano), d.Reason.String(), d.ViolatedBound.UTC().Format(time.RFC3339Nano)) } // ShardMapping contains a mapping of shards to points. diff --git a/coordinator/points_writer_test.go b/coordinator/points_writer_test.go index 7ae6ccadc81..345eeb7f645 100644 --- a/coordinator/points_writer_test.go +++ b/coordinator/points_writer_test.go @@ -1,6 +1,7 @@ package coordinator_test import ( + "errors" "fmt" "reflect" "sync" @@ -341,6 +342,8 @@ func TestPointsWriter_MapShards_Invalid(t *testing.T) { if got, exp := shardMappings.CountDropped, 1; got != exp { t.Fatalf("MapShard() dropped mismatch: got %v, exp %v", got, exp) } + + require.Equal(t, coordinator.RetentionPolicyBound, shardMappings.MinDropped.Reason, "unexpected reason for dropped point") } func TestPointsWriter_WritePoints(t *testing.T) { @@ -473,16 +476,9 @@ func TestPointsWriter_WritePoints_Dropped(t *testing.T) { // are created. ms := NewPointsWriterMetaClient() - // Three points that range over the shardGroup duration (1h) and should map to two - // distinct shards + // Add a point earlier than the retention period pr.AddPoint("cpu", 1.0, time.Now().Add(-24*time.Hour), nil) - // copy to prevent data race - sm := coordinator.NewShardMapping(16) - - // ShardMapper dropped this point - sm.AddDropped(pr.Points[0], time.Time{}, coordinator.RetentionPolicyBound) - // Local coordinator.Node ShardWriter // lock on the write increment since these functions get called in parallel var mu sync.Mutex @@ -512,13 +508,20 @@ func TestPointsWriter_WritePoints_Dropped(t *testing.T) { c.Subscriber = sub c.Node = &influxdb.Node{ID: 1} - c.Open() - defer c.Close() + require.NoError(t, c.Open(), "failure opening PointsWriter") + defer func(pw *coordinator.PointsWriter) { + require.NoError(t, pw.Close(), "failure closing PointsWriter") + }(c) err := c.WritePointsPrivileged(tsdb.WriteContext{}, pr.Database, pr.RetentionPolicy, models.ConsistencyLevelOne, pr.Points) - if _, ok := err.(tsdb.PartialWriteError); !ok { + require.Error(t, err, "unexpected success writing points") + var pwErr tsdb.PartialWriteError + if !errors.As(err, &pwErr) { t.Errorf("PointsWriter.WritePoints(): got %v, exp %v", err, tsdb.PartialWriteError{}) } + require.Equal(t, 1, pwErr.Dropped, "wrong number of points dropped") + require.ErrorContains(t, pwErr, "dropped 1 points outside retention policy or write window") + require.ErrorContains(t, pwErr, "Retention Policy Lower Bound") } type fakePointsWriter struct { From b1efaf73c913f6058b51222938fdae721be723c6 Mon Sep 17 00:00:00 2001 From: davidby-influx Date: Wed, 16 Apr 2025 11:48:20 -0700 Subject: [PATCH 3/5] fix: differentiate point time boundary violations --- coordinator/points_writer.go | 38 +++++++++++++++++++++---------- coordinator/points_writer_test.go | 8 +++---- 2 files changed, 30 insertions(+), 16 deletions(-) diff --git a/coordinator/points_writer.go b/coordinator/points_writer.go index 3b7ffc18f8f..362696b3c12 100644 --- a/coordinator/points_writer.go +++ b/coordinator/points_writer.go @@ -102,6 +102,7 @@ const ( RetentionPolicyBound WriteWindowUpperBound WriteWindowLowerBound + MaxBoundType // always the largest bound type, not for actual use ) func (b BoundType) String() string { @@ -131,12 +132,13 @@ func (d *DroppedPoint) String() string { // ShardMapping contains a mapping of shards to points. type ShardMapping struct { - n int - Points map[uint64][]models.Point // The points associated with a shard ID - Shards map[uint64]*meta.ShardInfo // The shards that have been mapped, keyed by shard ID - MaxDropped DroppedPoint - MinDropped DroppedPoint - CountDropped int + n int + Points map[uint64][]models.Point // The points associated with a shard ID + Shards map[uint64]*meta.ShardInfo // The shards that have been mapped, keyed by shard ID + MaxDropped DroppedPoint + MinDropped DroppedPoint + RetentionDropped int + WriteWindowDropped int } // NewShardMapping creates an empty ShardMapping. @@ -155,15 +157,27 @@ func (s *ShardMapping) AddDropped(p models.Point, t time.Time, b BoundType) { if s.MinDropped.Point == nil || p.Time().Before(s.MinDropped.Point.Time()) { s.MinDropped = DroppedPoint{Point: p, ViolatedBound: t, Reason: b} } - s.CountDropped++ + switch b { + case RetentionPolicyBound: + s.RetentionDropped++ + case WriteWindowLowerBound: + s.WriteWindowDropped++ + case WriteWindowUpperBound: + s.WriteWindowDropped++ + } +} + +func (s *ShardMapping) Dropped() int { + return s.RetentionDropped + s.WriteWindowDropped } func (s *ShardMapping) SummariseDropped() string { - if s.CountDropped == 0 { + if s.Dropped() <= 0 { return "" } - return fmt.Sprintf("dropped %d points outside retention policy or write window: %s to %s", - s.CountDropped, + return fmt.Sprintf("dropped %d points outside retention policy and %d points outside write window: %s to %s", + s.RetentionDropped, + s.WriteWindowDropped, s.MinDropped.String(), s.MaxDropped.String()) } @@ -482,9 +496,9 @@ func (w *PointsWriter) WritePointsPrivileged(writeCtx tsdb.WriteContext, databas w.Subscriber.Send(pts) atomic.AddInt64(&w.stats.SubWriteOK, 1) - if err == nil && shardMappings.CountDropped > 0 { + if err == nil && shardMappings.Dropped() > 0 { err = tsdb.PartialWriteError{Reason: shardMappings.SummariseDropped(), - Dropped: shardMappings.CountDropped, + Dropped: shardMappings.Dropped(), Database: database, RetentionPolicy: retentionPolicy, } diff --git a/coordinator/points_writer_test.go b/coordinator/points_writer_test.go index 345eeb7f645..1e95705a7ba 100644 --- a/coordinator/points_writer_test.go +++ b/coordinator/points_writer_test.go @@ -142,8 +142,8 @@ func MapPoints(t *testing.T, c *coordinator.PointsWriter, pr *coordinator.WriteP } } verify(p, values) - require.Equal(t, shardMappings.CountDropped, droppedCount, "wrong number of points dropped") - if shardMappings.CountDropped > 0 { + require.Equal(t, shardMappings.Dropped(), droppedCount, "wrong number of points dropped") + if shardMappings.Dropped() > 0 { require.Equal(t, minDropped.Point, shardMappings.MinDropped.Point, "minimum dropped point mismatch") require.Equal(t, minDropped.Reason, shardMappings.MinDropped.Reason, "minimum dropped reason mismatch") require.Equal(t, maxDropped.Point, shardMappings.MaxDropped.Point, "maximum dropped point mismatch") @@ -339,7 +339,7 @@ func TestPointsWriter_MapShards_Invalid(t *testing.T) { t.Errorf("MapShards() len mismatch. got %v, exp %v", got, exp) } - if got, exp := shardMappings.CountDropped, 1; got != exp { + if got, exp := shardMappings.RetentionDropped, 1; got != exp { t.Fatalf("MapShard() dropped mismatch: got %v, exp %v", got, exp) } @@ -520,7 +520,7 @@ func TestPointsWriter_WritePoints_Dropped(t *testing.T) { t.Errorf("PointsWriter.WritePoints(): got %v, exp %v", err, tsdb.PartialWriteError{}) } require.Equal(t, 1, pwErr.Dropped, "wrong number of points dropped") - require.ErrorContains(t, pwErr, "dropped 1 points outside retention policy or write window") + require.ErrorContains(t, pwErr, "dropped 1 points outside retention policy and 0 points outside write window") require.ErrorContains(t, pwErr, "Retention Policy Lower Bound") } From e10e9ca45edd50b4fb1cade5d08c1b56b296df04 Mon Sep 17 00:00:00 2001 From: davidby-influx Date: Wed, 16 Apr 2025 14:02:57 -0700 Subject: [PATCH 4/5] chore: clean up switch statement --- coordinator/points_writer.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/coordinator/points_writer.go b/coordinator/points_writer.go index 362696b3c12..27b80b4fe2b 100644 --- a/coordinator/points_writer.go +++ b/coordinator/points_writer.go @@ -160,9 +160,7 @@ func (s *ShardMapping) AddDropped(p models.Point, t time.Time, b BoundType) { switch b { case RetentionPolicyBound: s.RetentionDropped++ - case WriteWindowLowerBound: - s.WriteWindowDropped++ - case WriteWindowUpperBound: + case WriteWindowLowerBound, WriteWindowUpperBound: s.WriteWindowDropped++ } } From bd5b1bdf38decda9585bd90b1155628e4636b7af Mon Sep 17 00:00:00 2001 From: davidby-influx Date: Fri, 18 Apr 2025 14:20:27 -0700 Subject: [PATCH 5/5] fix: improve error messages --- coordinator/points_writer.go | 27 +++++++++++++++++++++++---- coordinator/points_writer_test.go | 30 +++++++++++++++++++++++++----- 2 files changed, 48 insertions(+), 9 deletions(-) diff --git a/coordinator/points_writer.go b/coordinator/points_writer.go index 27b80b4fe2b..2f9cb71e242 100644 --- a/coordinator/points_writer.go +++ b/coordinator/points_writer.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "sort" + "strings" "sync" "sync/atomic" "time" @@ -139,14 +140,16 @@ type ShardMapping struct { MinDropped DroppedPoint RetentionDropped int WriteWindowDropped int + rpi *meta.RetentionPolicyInfo } // NewShardMapping creates an empty ShardMapping. -func NewShardMapping(n int) *ShardMapping { +func NewShardMapping(rpi *meta.RetentionPolicyInfo, n int) *ShardMapping { return &ShardMapping{ n: n, Points: map[uint64][]models.Point{}, Shards: map[uint64]*meta.ShardInfo{}, + rpi: rpi, } } @@ -173,9 +176,25 @@ func (s *ShardMapping) SummariseDropped() string { if s.Dropped() <= 0 { return "" } - return fmt.Sprintf("dropped %d points outside retention policy and %d points outside write window: %s to %s", + summary := strings.Builder{} + if s.rpi.PastWriteLimit > 0 || s.rpi.FutureWriteLimit > 0 { + summary.WriteString(fmt.Sprintf(" and %d points outside write window (", s.WriteWindowDropped)) + if s.rpi.PastWriteLimit > 0 { + summary.WriteString("-") + summary.WriteString(s.rpi.PastWriteLimit.String()) + } + if s.rpi.FutureWriteLimit > 0 { + if s.rpi.PastWriteLimit > 0 { + summary.WriteString(" to ") + } + summary.WriteString(s.rpi.FutureWriteLimit.String()) + } + summary.WriteString(")") + } + return fmt.Sprintf("dropped %d points outside retention policy of duration %s%s - oldest %s, newest %s", s.RetentionDropped, - s.WriteWindowDropped, + s.rpi.Duration.String(), + summary.String(), s.MinDropped.String(), s.MaxDropped.String()) } @@ -315,7 +334,7 @@ func (w *PointsWriter) MapShards(wp *WritePointsRequest) (*ShardMapping, error) list.Add(*sg) } - mapping := NewShardMapping(len(wp.Points)) + mapping := NewShardMapping(rp, len(wp.Points)) for _, p := range wp.Points { sg := list.ShardGroupAt(p.Time()) if sg == nil { diff --git a/coordinator/points_writer_test.go b/coordinator/points_writer_test.go index 1e95705a7ba..92bf1e3ae15 100644 --- a/coordinator/points_writer_test.go +++ b/coordinator/points_writer_test.go @@ -93,7 +93,8 @@ func TestPointsWriter_MapShards_WriteLimits(t *testing.T) { MapPoints(t, c, pr, values, 2, &coordinator.DroppedPoint{Point: pr.Points[4], Reason: coordinator.WriteWindowLowerBound}, - &coordinator.DroppedPoint{Point: pr.Points[2], Reason: coordinator.WriteWindowUpperBound}) + &coordinator.DroppedPoint{Point: pr.Points[2], Reason: coordinator.WriteWindowUpperBound}, + "dropped 0 points outside retention policy of duration 3h0m0s and 2 points outside write window (-10m0s to 15m0s) -") // Clear the write limits by setting them to zero // No points should be dropped @@ -108,10 +109,28 @@ func TestPointsWriter_MapShards_WriteLimits(t *testing.T) { } require.NoError(t, meta.ApplyRetentionUpdate(rpu, rp), "ApplyRetentionUpdate failed") values = []float64{0.0, 1.0, 2.0, -1.0, -2.0} - MapPoints(t, c, pr, values, 0, nil, nil) + MapPoints(t, c, pr, values, 0, nil, nil, "dropped 0 points outside retention policy of duration 3h0m0s -") + + rpu.SetFutureWriteLimit(futureWriteLimit) + require.NoError(t, meta.ApplyRetentionUpdate(rpu, rp), "ApplyRetentionUpdate failed") + values = []float64{0.0, 1.0, -1.0, -2.0} + MapPoints(t, c, pr, values, 1, + &coordinator.DroppedPoint{Point: pr.Points[2], Reason: coordinator.WriteWindowUpperBound}, + &coordinator.DroppedPoint{Point: pr.Points[2], Reason: coordinator.WriteWindowUpperBound}, + "dropped 0 points outside retention policy of duration 3h0m0s and 1 points outside write window (15m0s) -") + + rpu.SetFutureWriteLimit(zeroDuration) + rpu.SetPastWriteLimit(pastWriteLimit) + require.NoError(t, meta.ApplyRetentionUpdate(rpu, rp), "ApplyRetentionUpdate failed") + values = []float64{0.0, 1.0, 2.0, -1.0} + MapPoints(t, c, pr, values, 1, + &coordinator.DroppedPoint{Point: pr.Points[4], Reason: coordinator.WriteWindowLowerBound}, + &coordinator.DroppedPoint{Point: pr.Points[4], Reason: coordinator.WriteWindowLowerBound}, + "dropped 0 points outside retention policy of duration 3h0m0s and 1 points outside write window (-10m0s) -") + } -func MapPoints(t *testing.T, c *coordinator.PointsWriter, pr *coordinator.WritePointsRequest, values []float64, droppedCount int, minDropped *coordinator.DroppedPoint, maxDropped *coordinator.DroppedPoint) { +func MapPoints(t *testing.T, c *coordinator.PointsWriter, pr *coordinator.WritePointsRequest, values []float64, droppedCount int, minDropped *coordinator.DroppedPoint, maxDropped *coordinator.DroppedPoint, summary string) { var ( shardMappings *coordinator.ShardMapping err error @@ -148,6 +167,7 @@ func MapPoints(t *testing.T, c *coordinator.PointsWriter, pr *coordinator.WriteP require.Equal(t, minDropped.Reason, shardMappings.MinDropped.Reason, "minimum dropped reason mismatch") require.Equal(t, maxDropped.Point, shardMappings.MaxDropped.Point, "maximum dropped point mismatch") require.Equal(t, maxDropped.Reason, shardMappings.MaxDropped.Reason, "maximum dropped reason mismatch") + require.Contains(t, shardMappings.SummariseDropped(), summary, "summary mismatch") } } @@ -393,7 +413,7 @@ func TestPointsWriter_WritePoints(t *testing.T) { // copy to prevent data race theTest := test - sm := coordinator.NewShardMapping(16) + sm := coordinator.NewShardMapping(nil, 16) sm.MapPoint(&meta.ShardInfo{ID: uint64(1), Owners: []meta.ShardOwner{ {NodeID: 1}, {NodeID: 2}, @@ -520,7 +540,7 @@ func TestPointsWriter_WritePoints_Dropped(t *testing.T) { t.Errorf("PointsWriter.WritePoints(): got %v, exp %v", err, tsdb.PartialWriteError{}) } require.Equal(t, 1, pwErr.Dropped, "wrong number of points dropped") - require.ErrorContains(t, pwErr, "dropped 1 points outside retention policy and 0 points outside write window") + require.ErrorContains(t, pwErr, "partial write: dropped 1 points outside retention policy of duration 1h0m0s") require.ErrorContains(t, pwErr, "Retention Policy Lower Bound") }