From 1efa2b7af93edfe52c4e6eada9c598204a8d14d5 Mon Sep 17 00:00:00 2001 From: davidby-influx <72418212+davidby-influx@users.noreply.github.com> Date: Fri, 18 Apr 2025 15:18:19 -0700 Subject: [PATCH 1/2] feat: improve dropped point logging (#26257) Log the reason for a point being dropped, the type of boundary violated, and the time that was the boundary. Prints the maximum and minimum points (by time) that were dropped closes https://github.com/influxdata/influxdb/issues/26252 * fix: better time formatting and additional testing * fix: differentiate point time boundary violations * chore: clean up switch statement * fix: improve error messages (cherry picked from commit 62e803e673ace7aeb3204b5b13c55665cf1fc1a8) closes https://github.com/influxdata/influxdb/issues/26295 --- tsdb/shard.go | 16 ++- v1/coordinator/points_writer.go | 102 ++++++++++++++-- v1/coordinator/points_writer_test.go | 174 +++++++++++++++++++++++++-- 3 files changed, 264 insertions(+), 28 deletions(-) diff --git a/tsdb/shard.go b/tsdb/shard.go index 50209fb53c9..1a2b29f9e86 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -102,15 +102,23 @@ func (e ShardError) Unwrap() error { // PartialWriteError indicates a write request could only write a portion of the // requested values. type PartialWriteError struct { - Reason string - Dropped int - + Reason string + Dropped int + Database string + RetentionPolicy string // A sorted slice of series keys that were dropped. DroppedKeys [][]byte } func (e PartialWriteError) Error() string { - return fmt.Sprintf("partial write: %s dropped=%d", e.Reason, e.Dropped) + message := fmt.Sprintf("partial write: %s dropped=%d", e.Reason, e.Dropped) + if len(e.Database) > 0 { + message = fmt.Sprintf("%s for database: %s", message, e.Database) + } + if len(e.RetentionPolicy) > 0 { + message = fmt.Sprintf("%s for retention policy: %s", message, e.RetentionPolicy) + } + return message } // Shard represents a self-contained time series database. An inverted index of diff --git a/v1/coordinator/points_writer.go b/v1/coordinator/points_writer.go index 4247054470f..fc891fc210c 100644 --- a/v1/coordinator/points_writer.go +++ b/v1/coordinator/points_writer.go @@ -75,21 +75,91 @@ func NewPointsWriter(writeTimeout time.Duration, path string) *PointsWriter { } } +type BoundType int + +const ( + WithinBounds BoundType = iota + RetentionPolicyBound + WriteWindowUpperBound + WriteWindowLowerBound + MaxBoundType // always the largest bound type, not for actual use +) + +func (b BoundType) String() string { + switch b { + case RetentionPolicyBound: + return "Retention Policy Lower Bound" + case WriteWindowUpperBound: + return "Write Window Upper Bound" + case WriteWindowLowerBound: + return "Write Window Lower Bound" + case WithinBounds: + return "Within Bounds" + default: + return "Unknown" + } +} + +type DroppedPoint struct { + Point models.Point + ViolatedBound time.Time + Reason BoundType +} + +func (d *DroppedPoint) String() string { + return fmt.Sprintf("point %s at %s dropped because it violates a %s at %s", d.Point.Key(), d.Point.Time().UTC().Format(time.RFC3339Nano), d.Reason.String(), d.ViolatedBound.UTC().Format(time.RFC3339Nano)) +} + // ShardMapping contains a mapping of shards to points. type ShardMapping struct { - n int - Points map[uint64][]models.Point // The points associated with a shard ID - Shards map[uint64]*meta.ShardInfo // The shards that have been mapped, keyed by shard ID - Dropped []models.Point // Points that were dropped + n int + Points map[uint64][]models.Point // The points associated with a shard ID + Shards map[uint64]*meta.ShardInfo // The shards that have been mapped, keyed by shard ID + MaxDropped DroppedPoint + MinDropped DroppedPoint + RetentionDropped int + WriteWindowDropped int + rpi *meta.RetentionPolicyInfo } // NewShardMapping creates an empty ShardMapping. -func NewShardMapping(n int) *ShardMapping { +func NewShardMapping(rpi *meta.RetentionPolicyInfo, n int) *ShardMapping { return &ShardMapping{ n: n, Points: map[uint64][]models.Point{}, Shards: map[uint64]*meta.ShardInfo{}, + rpi: rpi, + } +} + +func (s *ShardMapping) AddDropped(p models.Point, t time.Time, b BoundType) { + if s.MaxDropped.Point == nil || p.Time().After(s.MaxDropped.Point.Time()) { + s.MaxDropped = DroppedPoint{Point: p, ViolatedBound: t, Reason: b} } + if s.MinDropped.Point == nil || p.Time().Before(s.MinDropped.Point.Time()) { + s.MinDropped = DroppedPoint{Point: p, ViolatedBound: t, Reason: b} + } + switch b { + case RetentionPolicyBound: + s.RetentionDropped++ + case WriteWindowLowerBound, WriteWindowUpperBound: + s.WriteWindowDropped++ + } +} + +func (s *ShardMapping) Dropped() int { + return s.RetentionDropped + s.WriteWindowDropped +} + +func (s *ShardMapping) SummariseDropped() string { + if s.Dropped() <= 0 { + return "" + } + return fmt.Sprintf("dropped %d points outside retention policy of duration %s - oldest %s, newest %s", + s.RetentionDropped, + s.rpi.Duration.String(), + s.MinDropped.String(), + s.MaxDropped.String()) } // MapPoint adds the point to the ShardMapping, associated with the given shardInfo. @@ -247,13 +317,13 @@ func (w *PointsWriter) MapShards(wp *WritePointsRequest) (*ShardMapping, error) list.Add(*sg) } - mapping := NewShardMapping(len(wp.Points)) + mapping := NewShardMapping(rp, len(wp.Points)) for _, p := range wp.Points { sg := list.ShardGroupAt(p.Time()) if sg == nil { // We didn't create a shard group because the point was outside the - // scope of the RP. - mapping.Dropped = append(mapping.Dropped, p) + // scope of the RP + mapping.AddDropped(p, min, RetentionPolicyBound) continue } else if len(sg.Shards) <= 0 { // Shard groups should have at least one shard. @@ -361,7 +431,7 @@ func (w *PointsWriter) WritePoints( ctx context.Context, database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, - user meta.User, + _ meta.User, points []models.Point, ) error { return w.WritePointsPrivileged(ctx, database, retentionPolicy, consistencyLevel, points) @@ -406,12 +476,18 @@ func (w *PointsWriter) WritePointsPrivileged( }(shardMappings.Shards[shardID], database, retentionPolicy, points) } - if len(shardMappings.Dropped) > 0 { - w.stats.pointsWriteDropped.Observe(float64(len(shardMappings.Dropped))) - err = tsdb.PartialWriteError{Reason: "points beyond retention policy", Dropped: len(shardMappings.Dropped)} - } timeout := time.NewTimer(w.WriteTimeout) defer timeout.Stop() + + if err == nil && shardMappings.Dropped() > 0 { + w.stats.pointsWriteDropped.Observe(float64(shardMappings.Dropped())) + err = tsdb.PartialWriteError{Reason: shardMappings.SummariseDropped(), + Dropped: shardMappings.Dropped(), + Database: database, + RetentionPolicy: retentionPolicy, + } + } + for range shardMappings.Points { select { case <-w.closing: diff --git a/v1/coordinator/points_writer_test.go b/v1/coordinator/points_writer_test.go index 3a2d2728526..6f76286b5ca 100644 --- a/v1/coordinator/points_writer_test.go +++ b/v1/coordinator/points_writer_test.go @@ -1,7 +1,11 @@ package coordinator_test import ( +<<<<<<< HEAD:v1/coordinator/points_writer_test.go "context" +======= + "errors" +>>>>>>> 62e803e673 (feat: improve dropped point logging (#26257)):coordinator/points_writer_test.go "fmt" "sync" "sync/atomic" @@ -51,6 +55,127 @@ func TestPointsWriter_MapShards_One(t *testing.T) { } } +<<<<<<< HEAD:v1/coordinator/points_writer_test.go +======= +func TestPointsWriter_MapShards_WriteLimits(t *testing.T) { + ms := PointsWriterMetaClient{} + c := coordinator.NewPointsWriter() + + MustParseDuration := func(s string) time.Duration { + d, err := time.ParseDuration(s) + require.NoError(t, err, "failed to parse duration: %q", s) + return d + } + + pastWriteLimit := MustParseDuration("10m") + futureWriteLimit := MustParseDuration("15m") + rp := NewRetentionPolicy("myp", time.Now().Add(-time.Minute*45), 3*time.Hour, 3, futureWriteLimit, pastWriteLimit) + + ms.NodeIDFn = func() uint64 { return 1 } + ms.RetentionPolicyFn = func(db, retentionPolicy string) (*meta.RetentionPolicyInfo, error) { + return rp, nil + } + + ms.CreateShardGroupIfNotExistsFn = func(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error) { + return &rp.ShardGroups[0], nil + } + + c.MetaClient = ms + + pr := &coordinator.WritePointsRequest{ + Database: "mydb", + RetentionPolicy: "myrp", + } + + pr.AddPoint("cpu", 0.0, time.Now(), nil) + pr.AddPoint("cpu", 1.0, time.Now().Add(time.Second), nil) + pr.AddPoint("cpu", 2.0, time.Now().Add(time.Minute*30), nil) + pr.AddPoint("cpu", -1.0, time.Now().Add(-time.Minute*5), nil) + pr.AddPoint("cpu", -2.0, time.Now().Add(-time.Minute*20), nil) + + values := []float64{0.0, 1.0, -1.0} + + MapPoints(t, c, pr, values, 2, + &coordinator.DroppedPoint{Point: pr.Points[4], Reason: coordinator.WriteWindowLowerBound}, + &coordinator.DroppedPoint{Point: pr.Points[2], Reason: coordinator.WriteWindowUpperBound}, + "dropped 0 points outside retention policy of duration 3h0m0s and 2 points outside write window (-10m0s to 15m0s) -") + + // Clear the write limits by setting them to zero + // No points should be dropped + zeroDuration := time.Duration(0) + rpu := &meta.RetentionPolicyUpdate{ + Name: nil, + Duration: nil, + ReplicaN: nil, + ShardGroupDuration: nil, + FutureWriteLimit: &zeroDuration, + PastWriteLimit: &zeroDuration, + } + require.NoError(t, meta.ApplyRetentionUpdate(rpu, rp), "ApplyRetentionUpdate failed") + values = []float64{0.0, 1.0, 2.0, -1.0, -2.0} + MapPoints(t, c, pr, values, 0, nil, nil, "dropped 0 points outside retention policy of duration 3h0m0s -") + + rpu.SetFutureWriteLimit(futureWriteLimit) + require.NoError(t, meta.ApplyRetentionUpdate(rpu, rp), "ApplyRetentionUpdate failed") + values = []float64{0.0, 1.0, -1.0, -2.0} + MapPoints(t, c, pr, values, 1, + &coordinator.DroppedPoint{Point: pr.Points[2], Reason: coordinator.WriteWindowUpperBound}, + &coordinator.DroppedPoint{Point: pr.Points[2], Reason: coordinator.WriteWindowUpperBound}, + "dropped 0 points outside retention policy of duration 3h0m0s and 1 points outside write window (15m0s) -") + + rpu.SetFutureWriteLimit(zeroDuration) + rpu.SetPastWriteLimit(pastWriteLimit) + require.NoError(t, meta.ApplyRetentionUpdate(rpu, rp), "ApplyRetentionUpdate failed") + values = []float64{0.0, 1.0, 2.0, -1.0} + MapPoints(t, c, pr, values, 1, + &coordinator.DroppedPoint{Point: pr.Points[4], Reason: coordinator.WriteWindowLowerBound}, + &coordinator.DroppedPoint{Point: pr.Points[4], Reason: coordinator.WriteWindowLowerBound}, + "dropped 0 points outside retention policy of duration 3h0m0s and 1 points outside write window (-10m0s) -") + +} + +func MapPoints(t *testing.T, c *coordinator.PointsWriter, pr *coordinator.WritePointsRequest, values []float64, droppedCount int, minDropped *coordinator.DroppedPoint, maxDropped *coordinator.DroppedPoint, summary string) { + var ( + shardMappings *coordinator.ShardMapping + err error + ) + if shardMappings, err = c.MapShards(pr); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if exp := 1; len(shardMappings.Points) != exp { + t.Errorf("MapShards() len mismatch. got %v, exp %v", len(shardMappings.Points), exp) + } + + p := func() []models.Point { + for _, v := range shardMappings.Points { + return v + } + return nil + }() + verify := + func(p []models.Point, values []float64) { + require.Equal(t, len(values), len(p), "unexpected number of points") + for i, expV := range values { + f, err := p[i].Fields() + require.NoError(t, err, "error retrieving fields") + v, ok := f["value"] + require.True(t, ok, "\"value\" field not found") + require.Equal(t, expV, v, "unexpected value") + } + } + verify(p, values) + require.Equal(t, shardMappings.Dropped(), droppedCount, "wrong number of points dropped") + if shardMappings.Dropped() > 0 { + require.Equal(t, minDropped.Point, shardMappings.MinDropped.Point, "minimum dropped point mismatch") + require.Equal(t, minDropped.Reason, shardMappings.MinDropped.Reason, "minimum dropped reason mismatch") + require.Equal(t, maxDropped.Point, shardMappings.MaxDropped.Point, "maximum dropped point mismatch") + require.Equal(t, maxDropped.Reason, shardMappings.MaxDropped.Reason, "maximum dropped reason mismatch") + require.Contains(t, shardMappings.SummariseDropped(), summary, "summary mismatch") + } +} + +>>>>>>> 62e803e673 (feat: improve dropped point logging (#26257)):coordinator/points_writer_test.go // Ensures the points writer maps to a new shard group when the shard duration // is changed. func TestPointsWriter_MapShards_AlterShardDuration(t *testing.T) { @@ -239,9 +364,11 @@ func TestPointsWriter_MapShards_Invalid(t *testing.T) { t.Errorf("MapShards() len mismatch. got %v, exp %v", got, exp) } - if got, exp := len(shardMappings.Dropped), 1; got != exp { + if got, exp := shardMappings.RetentionDropped, 1; got != exp { t.Fatalf("MapShard() dropped mismatch: got %v, exp %v", got, exp) } + + require.Equal(t, coordinator.RetentionPolicyBound, shardMappings.MinDropped.Reason, "unexpected reason for dropped point") } func TestPointsWriter_WritePoints(t *testing.T) { @@ -288,6 +415,7 @@ func TestPointsWriter_WritePoints(t *testing.T) { pr.AddPoint("cpu", 3.0, time.Now().Add(time.Hour+time.Second), nil) // copy to prevent data race +<<<<<<< HEAD:v1/coordinator/points_writer_test.go sm := coordinator.NewShardMapping(16) sm.MapPoint( &meta.ShardInfo{ID: uint64(1), Owners: []meta.ShardOwner{ @@ -310,6 +438,25 @@ func TestPointsWriter_WritePoints(t *testing.T) { {NodeID: 3}, }}, pr.Points[2]) +======= + theTest := test + sm := coordinator.NewShardMapping(nil, 16) + sm.MapPoint(&meta.ShardInfo{ID: uint64(1), Owners: []meta.ShardOwner{ + {NodeID: 1}, + {NodeID: 2}, + {NodeID: 3}, + }}, pr.Points[0]) + sm.MapPoint(&meta.ShardInfo{ID: uint64(2), Owners: []meta.ShardOwner{ + {NodeID: 1}, + {NodeID: 2}, + {NodeID: 3}, + }}, pr.Points[1]) + sm.MapPoint(&meta.ShardInfo{ID: uint64(2), Owners: []meta.ShardOwner{ + {NodeID: 1}, + {NodeID: 2}, + {NodeID: 3}, + }}, pr.Points[2]) +>>>>>>> 62e803e673 (feat: improve dropped point logging (#26257)):coordinator/points_writer_test.go // Local coordinator.Node ShardWriter // lock on the write increment since these functions get called in parallel @@ -360,16 +507,9 @@ func TestPointsWriter_WritePoints_Dropped(t *testing.T) { // are created. ms := NewPointsWriterMetaClient() - // Three points that range over the shardGroup duration (1h) and should map to two - // distinct shards + // Add a point earlier than the retention period pr.AddPoint("cpu", 1.0, time.Now().Add(-24*time.Hour), nil) - // copy to prevent data race - sm := coordinator.NewShardMapping(16) - - // ShardMapper dropped this point - sm.Dropped = append(sm.Dropped, pr.Points[0]) - // Local coordinator.Node ShardWriter // lock on the write increment since these functions get called in parallel var mu sync.Mutex @@ -392,13 +532,25 @@ func TestPointsWriter_WritePoints_Dropped(t *testing.T) { c.TSDBStore = store c.Node = &influxdb.Node{ID: 1} - c.Open() - defer c.Close() + require.NoError(t, c.Open(), "failure opening PointsWriter") + defer func(pw *coordinator.PointsWriter) { + require.NoError(t, pw.Close(), "failure closing PointsWriter") + }(c) +<<<<<<< HEAD:v1/coordinator/points_writer_test.go err := c.WritePointsPrivileged(context.Background(), pr.Database, pr.RetentionPolicy, models.ConsistencyLevelOne, pr.Points) if _, ok := err.(tsdb.PartialWriteError); !ok { +======= + err := c.WritePointsPrivileged(tsdb.WriteContext{}, pr.Database, pr.RetentionPolicy, models.ConsistencyLevelOne, pr.Points) + require.Error(t, err, "unexpected success writing points") + var pwErr tsdb.PartialWriteError + if !errors.As(err, &pwErr) { +>>>>>>> 62e803e673 (feat: improve dropped point logging (#26257)):coordinator/points_writer_test.go t.Errorf("PointsWriter.WritePoints(): got %v, exp %v", err, tsdb.PartialWriteError{}) } + require.Equal(t, 1, pwErr.Dropped, "wrong number of points dropped") + require.ErrorContains(t, pwErr, "partial write: dropped 1 points outside retention policy of duration 1h0m0s") + require.ErrorContains(t, pwErr, "Retention Policy Lower Bound") } var shardID uint64 From ff06088b6c8cbc1318737544023146b8074f7e20 Mon Sep 17 00:00:00 2001 From: davidby-influx Date: Mon, 21 Apr 2025 18:33:44 -0700 Subject: [PATCH 2/2] fix: cherry-pick tests --- v1/coordinator/points_writer_test.go | 111 +-------------------------- 1 file changed, 2 insertions(+), 109 deletions(-) diff --git a/v1/coordinator/points_writer_test.go b/v1/coordinator/points_writer_test.go index 6f76286b5ca..d81890c868a 100644 --- a/v1/coordinator/points_writer_test.go +++ b/v1/coordinator/points_writer_test.go @@ -1,12 +1,10 @@ package coordinator_test import ( -<<<<<<< HEAD:v1/coordinator/points_writer_test.go "context" -======= "errors" ->>>>>>> 62e803e673 (feat: improve dropped point logging (#26257)):coordinator/points_writer_test.go "fmt" + "github.com/stretchr/testify/require" "sync" "sync/atomic" "testing" @@ -55,85 +53,6 @@ func TestPointsWriter_MapShards_One(t *testing.T) { } } -<<<<<<< HEAD:v1/coordinator/points_writer_test.go -======= -func TestPointsWriter_MapShards_WriteLimits(t *testing.T) { - ms := PointsWriterMetaClient{} - c := coordinator.NewPointsWriter() - - MustParseDuration := func(s string) time.Duration { - d, err := time.ParseDuration(s) - require.NoError(t, err, "failed to parse duration: %q", s) - return d - } - - pastWriteLimit := MustParseDuration("10m") - futureWriteLimit := MustParseDuration("15m") - rp := NewRetentionPolicy("myp", time.Now().Add(-time.Minute*45), 3*time.Hour, 3, futureWriteLimit, pastWriteLimit) - - ms.NodeIDFn = func() uint64 { return 1 } - ms.RetentionPolicyFn = func(db, retentionPolicy string) (*meta.RetentionPolicyInfo, error) { - return rp, nil - } - - ms.CreateShardGroupIfNotExistsFn = func(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error) { - return &rp.ShardGroups[0], nil - } - - c.MetaClient = ms - - pr := &coordinator.WritePointsRequest{ - Database: "mydb", - RetentionPolicy: "myrp", - } - - pr.AddPoint("cpu", 0.0, time.Now(), nil) - pr.AddPoint("cpu", 1.0, time.Now().Add(time.Second), nil) - pr.AddPoint("cpu", 2.0, time.Now().Add(time.Minute*30), nil) - pr.AddPoint("cpu", -1.0, time.Now().Add(-time.Minute*5), nil) - pr.AddPoint("cpu", -2.0, time.Now().Add(-time.Minute*20), nil) - - values := []float64{0.0, 1.0, -1.0} - - MapPoints(t, c, pr, values, 2, - &coordinator.DroppedPoint{Point: pr.Points[4], Reason: coordinator.WriteWindowLowerBound}, - &coordinator.DroppedPoint{Point: pr.Points[2], Reason: coordinator.WriteWindowUpperBound}, - "dropped 0 points outside retention policy of duration 3h0m0s and 2 points outside write window (-10m0s to 15m0s) -") - - // Clear the write limits by setting them to zero - // No points should be dropped - zeroDuration := time.Duration(0) - rpu := &meta.RetentionPolicyUpdate{ - Name: nil, - Duration: nil, - ReplicaN: nil, - ShardGroupDuration: nil, - FutureWriteLimit: &zeroDuration, - PastWriteLimit: &zeroDuration, - } - require.NoError(t, meta.ApplyRetentionUpdate(rpu, rp), "ApplyRetentionUpdate failed") - values = []float64{0.0, 1.0, 2.0, -1.0, -2.0} - MapPoints(t, c, pr, values, 0, nil, nil, "dropped 0 points outside retention policy of duration 3h0m0s -") - - rpu.SetFutureWriteLimit(futureWriteLimit) - require.NoError(t, meta.ApplyRetentionUpdate(rpu, rp), "ApplyRetentionUpdate failed") - values = []float64{0.0, 1.0, -1.0, -2.0} - MapPoints(t, c, pr, values, 1, - &coordinator.DroppedPoint{Point: pr.Points[2], Reason: coordinator.WriteWindowUpperBound}, - &coordinator.DroppedPoint{Point: pr.Points[2], Reason: coordinator.WriteWindowUpperBound}, - "dropped 0 points outside retention policy of duration 3h0m0s and 1 points outside write window (15m0s) -") - - rpu.SetFutureWriteLimit(zeroDuration) - rpu.SetPastWriteLimit(pastWriteLimit) - require.NoError(t, meta.ApplyRetentionUpdate(rpu, rp), "ApplyRetentionUpdate failed") - values = []float64{0.0, 1.0, 2.0, -1.0} - MapPoints(t, c, pr, values, 1, - &coordinator.DroppedPoint{Point: pr.Points[4], Reason: coordinator.WriteWindowLowerBound}, - &coordinator.DroppedPoint{Point: pr.Points[4], Reason: coordinator.WriteWindowLowerBound}, - "dropped 0 points outside retention policy of duration 3h0m0s and 1 points outside write window (-10m0s) -") - -} - func MapPoints(t *testing.T, c *coordinator.PointsWriter, pr *coordinator.WritePointsRequest, values []float64, droppedCount int, minDropped *coordinator.DroppedPoint, maxDropped *coordinator.DroppedPoint, summary string) { var ( shardMappings *coordinator.ShardMapping @@ -175,7 +94,6 @@ func MapPoints(t *testing.T, c *coordinator.PointsWriter, pr *coordinator.WriteP } } ->>>>>>> 62e803e673 (feat: improve dropped point logging (#26257)):coordinator/points_writer_test.go // Ensures the points writer maps to a new shard group when the shard duration // is changed. func TestPointsWriter_MapShards_AlterShardDuration(t *testing.T) { @@ -415,8 +333,7 @@ func TestPointsWriter_WritePoints(t *testing.T) { pr.AddPoint("cpu", 3.0, time.Now().Add(time.Hour+time.Second), nil) // copy to prevent data race -<<<<<<< HEAD:v1/coordinator/points_writer_test.go - sm := coordinator.NewShardMapping(16) + sm := coordinator.NewShardMapping(nil, 16) sm.MapPoint( &meta.ShardInfo{ID: uint64(1), Owners: []meta.ShardOwner{ {NodeID: 1}, @@ -438,25 +355,6 @@ func TestPointsWriter_WritePoints(t *testing.T) { {NodeID: 3}, }}, pr.Points[2]) -======= - theTest := test - sm := coordinator.NewShardMapping(nil, 16) - sm.MapPoint(&meta.ShardInfo{ID: uint64(1), Owners: []meta.ShardOwner{ - {NodeID: 1}, - {NodeID: 2}, - {NodeID: 3}, - }}, pr.Points[0]) - sm.MapPoint(&meta.ShardInfo{ID: uint64(2), Owners: []meta.ShardOwner{ - {NodeID: 1}, - {NodeID: 2}, - {NodeID: 3}, - }}, pr.Points[1]) - sm.MapPoint(&meta.ShardInfo{ID: uint64(2), Owners: []meta.ShardOwner{ - {NodeID: 1}, - {NodeID: 2}, - {NodeID: 3}, - }}, pr.Points[2]) ->>>>>>> 62e803e673 (feat: improve dropped point logging (#26257)):coordinator/points_writer_test.go // Local coordinator.Node ShardWriter // lock on the write increment since these functions get called in parallel @@ -537,15 +435,10 @@ func TestPointsWriter_WritePoints_Dropped(t *testing.T) { require.NoError(t, pw.Close(), "failure closing PointsWriter") }(c) -<<<<<<< HEAD:v1/coordinator/points_writer_test.go err := c.WritePointsPrivileged(context.Background(), pr.Database, pr.RetentionPolicy, models.ConsistencyLevelOne, pr.Points) - if _, ok := err.(tsdb.PartialWriteError); !ok { -======= - err := c.WritePointsPrivileged(tsdb.WriteContext{}, pr.Database, pr.RetentionPolicy, models.ConsistencyLevelOne, pr.Points) require.Error(t, err, "unexpected success writing points") var pwErr tsdb.PartialWriteError if !errors.As(err, &pwErr) { ->>>>>>> 62e803e673 (feat: improve dropped point logging (#26257)):coordinator/points_writer_test.go t.Errorf("PointsWriter.WritePoints(): got %v, exp %v", err, tsdb.PartialWriteError{}) } require.Equal(t, 1, pwErr.Dropped, "wrong number of points dropped")