Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 110 additions & 17 deletions coordinator/points_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -95,23 +96,109 @@ func NewPointsWriter() *PointsWriter {
}
}

type BoundType int

const (
WithinBounds BoundType = iota
RetentionPolicyBound
WriteWindowUpperBound
WriteWindowLowerBound
MaxBoundType // always the largest bound type, not for actual use
)

func (b BoundType) String() string {
switch b {
case RetentionPolicyBound:
return "Retention Policy Lower Bound"
case WriteWindowUpperBound:
return "Write Window Upper Bound"
case WriteWindowLowerBound:
return "Write Window Lower Bound"
case WithinBounds:
return "Within Bounds"
default:
return "Unknown"
}
}

type DroppedPoint struct {
Point models.Point
ViolatedBound time.Time
Reason BoundType
}

func (d *DroppedPoint) String() string {
return fmt.Sprintf("point %s at %s dropped because it violates a %s at %s", d.Point.Key(), d.Point.Time().UTC().Format(time.RFC3339Nano), d.Reason.String(), d.ViolatedBound.UTC().Format(time.RFC3339Nano))
}

// ShardMapping contains a mapping of shards to points.
type ShardMapping struct {
n int
Points map[uint64][]models.Point // The points associated with a shard ID
Shards map[uint64]*meta.ShardInfo // The shards that have been mapped, keyed by shard ID
Dropped []models.Point // Points that were dropped
n int
Points map[uint64][]models.Point // The points associated with a shard ID
Shards map[uint64]*meta.ShardInfo // The shards that have been mapped, keyed by shard ID
MaxDropped DroppedPoint
MinDropped DroppedPoint
RetentionDropped int
WriteWindowDropped int
rpi *meta.RetentionPolicyInfo
}

// NewShardMapping creates an empty ShardMapping.
func NewShardMapping(n int) *ShardMapping {
func NewShardMapping(rpi *meta.RetentionPolicyInfo, n int) *ShardMapping {
return &ShardMapping{
n: n,
Points: map[uint64][]models.Point{},
Shards: map[uint64]*meta.ShardInfo{},
rpi: rpi,
}
}

func (s *ShardMapping) AddDropped(p models.Point, t time.Time, b BoundType) {
if s.MaxDropped.Point == nil || p.Time().After(s.MaxDropped.Point.Time()) {
s.MaxDropped = DroppedPoint{Point: p, ViolatedBound: t, Reason: b}
}
if s.MinDropped.Point == nil || p.Time().Before(s.MinDropped.Point.Time()) {
s.MinDropped = DroppedPoint{Point: p, ViolatedBound: t, Reason: b}
}
switch b {
case RetentionPolicyBound:
s.RetentionDropped++
case WriteWindowLowerBound, WriteWindowUpperBound:
s.WriteWindowDropped++
}
}

func (s *ShardMapping) Dropped() int {
return s.RetentionDropped + s.WriteWindowDropped
}

func (s *ShardMapping) SummariseDropped() string {
if s.Dropped() <= 0 {
return ""
}
summary := strings.Builder{}
if s.rpi.PastWriteLimit > 0 || s.rpi.FutureWriteLimit > 0 {
summary.WriteString(fmt.Sprintf(" and %d points outside write window (", s.WriteWindowDropped))
if s.rpi.PastWriteLimit > 0 {
summary.WriteString("-")
summary.WriteString(s.rpi.PastWriteLimit.String())
}
if s.rpi.FutureWriteLimit > 0 {
if s.rpi.PastWriteLimit > 0 {
summary.WriteString(" to ")
}
summary.WriteString(s.rpi.FutureWriteLimit.String())
}
summary.WriteString(")")
}
return fmt.Sprintf("dropped %d points outside retention policy of duration %s%s - oldest %s, newest %s",
s.RetentionDropped,
s.rpi.Duration.String(),
summary.String(),
s.MinDropped.String(),
s.MaxDropped.String())
}

// MapPoint adds the point to the ShardMapping, associated with the given shardInfo.
func (s *ShardMapping) MapPoint(shardInfo *meta.ShardInfo, p models.Point) {
if cap(s.Points[shardInfo.ID]) < s.n {
Expand Down Expand Up @@ -147,14 +234,14 @@ func NewWriteWindow(rp *meta.RetentionPolicyInfo) *WriteWindow {
return w
}

func (w *WriteWindow) WithinWindow(t time.Time) bool {
func (w *WriteWindow) WithinWindow(t time.Time) (bool, time.Time, BoundType) {
if w.checkBefore && t.Before(w.before) {
return false
return false, w.before, WriteWindowLowerBound
}
if w.checkAfter && t.After(w.after) {
return false
return false, w.after, WriteWindowUpperBound
}
return true
return true, time.Time{}, WithinBounds
}

// Open opens the communication channel with the point writer.
Expand Down Expand Up @@ -229,7 +316,8 @@ func (w *PointsWriter) MapShards(wp *WritePointsRequest) (*ShardMapping, error)
// Either the point is outside the scope of the RP, we already have
// a suitable shard group for the point, or it is outside the write window
// for the RP, and we don't want to unnecessarily create a shard for it
if p.Time().Before(min) || list.Covers(p.Time()) || !ww.WithinWindow(p.Time()) {
withinWindow, _, _ := ww.WithinWindow(p.Time())
if p.Time().Before(min) || list.Covers(p.Time()) || !withinWindow {
continue
}

Expand All @@ -246,13 +334,18 @@ func (w *PointsWriter) MapShards(wp *WritePointsRequest) (*ShardMapping, error)
list.Add(*sg)
}

mapping := NewShardMapping(len(wp.Points))
mapping := NewShardMapping(rp, len(wp.Points))
for _, p := range wp.Points {
sg := list.ShardGroupAt(p.Time())
if sg == nil || !ww.WithinWindow(p.Time()) {
if sg == nil {
// We didn't create a shard group because the point was outside the
// scope of the RP, or the point is outside the write window for the RP.
mapping.Dropped = append(mapping.Dropped, p)
// scope of the RP
mapping.AddDropped(p, min, RetentionPolicyBound)
atomic.AddInt64(&w.stats.WriteDropped, 1)
continue
} else if withinWindow, bound, reason := ww.WithinWindow(p.Time()); !withinWindow {
// The point is outside the write window for the RP.
mapping.AddDropped(p, bound, reason)
atomic.AddInt64(&w.stats.WriteDropped, 1)
continue
} else if len(sg.Shards) <= 0 {
Expand Down Expand Up @@ -420,9 +513,9 @@ func (w *PointsWriter) WritePointsPrivileged(writeCtx tsdb.WriteContext, databas
w.Subscriber.Send(pts)
atomic.AddInt64(&w.stats.SubWriteOK, 1)

if err == nil && len(shardMappings.Dropped) > 0 {
err = tsdb.PartialWriteError{Reason: "points beyond retention policy or outside permissible write window",
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new code would have an error message of "dropped %d points outside retention policy of duration". I assume this message is what the client will receive. Any risk of this breaking any client libraries or C1 alerting?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In theory, it could break picky client code, but my hope is that people are switching on the 4XX HTTP error, not the message text.

Dropped: len(shardMappings.Dropped),
if err == nil && shardMappings.Dropped() > 0 {
err = tsdb.PartialWriteError{Reason: shardMappings.SummariseDropped(),
Dropped: shardMappings.Dropped(),
Database: database,
RetentionPolicy: retentionPolicy,
}
Expand Down
67 changes: 48 additions & 19 deletions coordinator/points_writer_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package coordinator_test

import (
"errors"
"fmt"
"reflect"
"sync"
Expand Down Expand Up @@ -89,9 +90,11 @@ func TestPointsWriter_MapShards_WriteLimits(t *testing.T) {
pr.AddPoint("cpu", -2.0, time.Now().Add(-time.Minute*20), nil)

values := []float64{0.0, 1.0, -1.0}
dropped := []float64{2.0, -2.0}

MapPoints(t, c, pr, values, dropped)
MapPoints(t, c, pr, values, 2,
&coordinator.DroppedPoint{Point: pr.Points[4], Reason: coordinator.WriteWindowLowerBound},
&coordinator.DroppedPoint{Point: pr.Points[2], Reason: coordinator.WriteWindowUpperBound},
"dropped 0 points outside retention policy of duration 3h0m0s and 2 points outside write window (-10m0s to 15m0s) -")

// Clear the write limits by setting them to zero
// No points should be dropped
Expand All @@ -106,11 +109,28 @@ func TestPointsWriter_MapShards_WriteLimits(t *testing.T) {
}
require.NoError(t, meta.ApplyRetentionUpdate(rpu, rp), "ApplyRetentionUpdate failed")
values = []float64{0.0, 1.0, 2.0, -1.0, -2.0}
dropped = []float64{}
MapPoints(t, c, pr, values, dropped)
MapPoints(t, c, pr, values, 0, nil, nil, "dropped 0 points outside retention policy of duration 3h0m0s -")

rpu.SetFutureWriteLimit(futureWriteLimit)
require.NoError(t, meta.ApplyRetentionUpdate(rpu, rp), "ApplyRetentionUpdate failed")
values = []float64{0.0, 1.0, -1.0, -2.0}
MapPoints(t, c, pr, values, 1,
&coordinator.DroppedPoint{Point: pr.Points[2], Reason: coordinator.WriteWindowUpperBound},
&coordinator.DroppedPoint{Point: pr.Points[2], Reason: coordinator.WriteWindowUpperBound},
"dropped 0 points outside retention policy of duration 3h0m0s and 1 points outside write window (15m0s) -")

rpu.SetFutureWriteLimit(zeroDuration)
rpu.SetPastWriteLimit(pastWriteLimit)
require.NoError(t, meta.ApplyRetentionUpdate(rpu, rp), "ApplyRetentionUpdate failed")
values = []float64{0.0, 1.0, 2.0, -1.0}
MapPoints(t, c, pr, values, 1,
&coordinator.DroppedPoint{Point: pr.Points[4], Reason: coordinator.WriteWindowLowerBound},
&coordinator.DroppedPoint{Point: pr.Points[4], Reason: coordinator.WriteWindowLowerBound},
"dropped 0 points outside retention policy of duration 3h0m0s and 1 points outside write window (-10m0s) -")

}

func MapPoints(t *testing.T, c *coordinator.PointsWriter, pr *coordinator.WritePointsRequest, values []float64, dropped []float64) {
func MapPoints(t *testing.T, c *coordinator.PointsWriter, pr *coordinator.WritePointsRequest, values []float64, droppedCount int, minDropped *coordinator.DroppedPoint, maxDropped *coordinator.DroppedPoint, summary string) {
var (
shardMappings *coordinator.ShardMapping
err error
Expand Down Expand Up @@ -141,7 +161,14 @@ func MapPoints(t *testing.T, c *coordinator.PointsWriter, pr *coordinator.WriteP
}
}
verify(p, values)
verify(shardMappings.Dropped, dropped)
require.Equal(t, shardMappings.Dropped(), droppedCount, "wrong number of points dropped")
if shardMappings.Dropped() > 0 {
require.Equal(t, minDropped.Point, shardMappings.MinDropped.Point, "minimum dropped point mismatch")
require.Equal(t, minDropped.Reason, shardMappings.MinDropped.Reason, "minimum dropped reason mismatch")
require.Equal(t, maxDropped.Point, shardMappings.MaxDropped.Point, "maximum dropped point mismatch")
require.Equal(t, maxDropped.Reason, shardMappings.MaxDropped.Reason, "maximum dropped reason mismatch")
require.Contains(t, shardMappings.SummariseDropped(), summary, "summary mismatch")
}
}

// Ensures the points writer maps to a new shard group when the shard duration
Expand Down Expand Up @@ -332,9 +359,11 @@ func TestPointsWriter_MapShards_Invalid(t *testing.T) {
t.Errorf("MapShards() len mismatch. got %v, exp %v", got, exp)
}

if got, exp := len(shardMappings.Dropped), 1; got != exp {
if got, exp := shardMappings.RetentionDropped, 1; got != exp {
t.Fatalf("MapShard() dropped mismatch: got %v, exp %v", got, exp)
}

require.Equal(t, coordinator.RetentionPolicyBound, shardMappings.MinDropped.Reason, "unexpected reason for dropped point")
}

func TestPointsWriter_WritePoints(t *testing.T) {
Expand Down Expand Up @@ -384,7 +413,7 @@ func TestPointsWriter_WritePoints(t *testing.T) {

// copy to prevent data race
theTest := test
sm := coordinator.NewShardMapping(16)
sm := coordinator.NewShardMapping(nil, 16)
sm.MapPoint(&meta.ShardInfo{ID: uint64(1), Owners: []meta.ShardOwner{
{NodeID: 1},
{NodeID: 2},
Expand Down Expand Up @@ -467,16 +496,9 @@ func TestPointsWriter_WritePoints_Dropped(t *testing.T) {
// are created.
ms := NewPointsWriterMetaClient()

// Three points that range over the shardGroup duration (1h) and should map to two
// distinct shards
// Add a point earlier than the retention period
pr.AddPoint("cpu", 1.0, time.Now().Add(-24*time.Hour), nil)

// copy to prevent data race
sm := coordinator.NewShardMapping(16)

// ShardMapper dropped this point
sm.Dropped = append(sm.Dropped, pr.Points[0])

// Local coordinator.Node ShardWriter
// lock on the write increment since these functions get called in parallel
var mu sync.Mutex
Expand Down Expand Up @@ -506,13 +528,20 @@ func TestPointsWriter_WritePoints_Dropped(t *testing.T) {
c.Subscriber = sub
c.Node = &influxdb.Node{ID: 1}

c.Open()
defer c.Close()
require.NoError(t, c.Open(), "failure opening PointsWriter")
defer func(pw *coordinator.PointsWriter) {
require.NoError(t, pw.Close(), "failure closing PointsWriter")
}(c)

err := c.WritePointsPrivileged(tsdb.WriteContext{}, pr.Database, pr.RetentionPolicy, models.ConsistencyLevelOne, pr.Points)
if _, ok := err.(tsdb.PartialWriteError); !ok {
require.Error(t, err, "unexpected success writing points")
var pwErr tsdb.PartialWriteError
if !errors.As(err, &pwErr) {
t.Errorf("PointsWriter.WritePoints(): got %v, exp %v", err, tsdb.PartialWriteError{})
}
require.Equal(t, 1, pwErr.Dropped, "wrong number of points dropped")
require.ErrorContains(t, pwErr, "partial write: dropped 1 points outside retention policy of duration 1h0m0s")
require.ErrorContains(t, pwErr, "Retention Policy Lower Bound")
}

type fakePointsWriter struct {
Expand Down