Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions tsdb/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,23 @@ func (e ShardError) Unwrap() error {
// PartialWriteError indicates a write request could only write a portion of the
// requested values.
type PartialWriteError struct {
Reason string
Dropped int

Reason string
Dropped int
Database string
RetentionPolicy string
// A sorted slice of series keys that were dropped.
DroppedKeys [][]byte
}

func (e PartialWriteError) Error() string {
return fmt.Sprintf("partial write: %s dropped=%d", e.Reason, e.Dropped)
message := fmt.Sprintf("partial write: %s dropped=%d", e.Reason, e.Dropped)
if len(e.Database) > 0 {
message = fmt.Sprintf("%s for database: %s", message, e.Database)
}
if len(e.RetentionPolicy) > 0 {
message = fmt.Sprintf("%s for retention policy: %s", message, e.RetentionPolicy)
}
return message
}

// Shard represents a self-contained time series database. An inverted index of
Expand Down
102 changes: 89 additions & 13 deletions v1/coordinator/points_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,21 +75,91 @@ func NewPointsWriter(writeTimeout time.Duration, path string) *PointsWriter {
}
}

type BoundType int

const (
WithinBounds BoundType = iota
RetentionPolicyBound
WriteWindowUpperBound
WriteWindowLowerBound
MaxBoundType // always the largest bound type, not for actual use
)

func (b BoundType) String() string {
switch b {
case RetentionPolicyBound:
return "Retention Policy Lower Bound"
case WriteWindowUpperBound:
return "Write Window Upper Bound"
case WriteWindowLowerBound:
return "Write Window Lower Bound"
case WithinBounds:
return "Within Bounds"
default:
return "Unknown"
}
}

type DroppedPoint struct {
Point models.Point
ViolatedBound time.Time
Reason BoundType
}

func (d *DroppedPoint) String() string {
return fmt.Sprintf("point %s at %s dropped because it violates a %s at %s", d.Point.Key(), d.Point.Time().UTC().Format(time.RFC3339Nano), d.Reason.String(), d.ViolatedBound.UTC().Format(time.RFC3339Nano))
}

// ShardMapping contains a mapping of shards to points.
type ShardMapping struct {
n int
Points map[uint64][]models.Point // The points associated with a shard ID
Shards map[uint64]*meta.ShardInfo // The shards that have been mapped, keyed by shard ID
Dropped []models.Point // Points that were dropped
n int
Points map[uint64][]models.Point // The points associated with a shard ID
Shards map[uint64]*meta.ShardInfo // The shards that have been mapped, keyed by shard ID
MaxDropped DroppedPoint
MinDropped DroppedPoint
RetentionDropped int
WriteWindowDropped int
rpi *meta.RetentionPolicyInfo
}

// NewShardMapping creates an empty ShardMapping.
func NewShardMapping(n int) *ShardMapping {
func NewShardMapping(rpi *meta.RetentionPolicyInfo, n int) *ShardMapping {
return &ShardMapping{
n: n,
Points: map[uint64][]models.Point{},
Shards: map[uint64]*meta.ShardInfo{},
rpi: rpi,
}
}

func (s *ShardMapping) AddDropped(p models.Point, t time.Time, b BoundType) {
if s.MaxDropped.Point == nil || p.Time().After(s.MaxDropped.Point.Time()) {
s.MaxDropped = DroppedPoint{Point: p, ViolatedBound: t, Reason: b}
}
if s.MinDropped.Point == nil || p.Time().Before(s.MinDropped.Point.Time()) {
s.MinDropped = DroppedPoint{Point: p, ViolatedBound: t, Reason: b}
}
switch b {
case RetentionPolicyBound:
s.RetentionDropped++
case WriteWindowLowerBound, WriteWindowUpperBound:
s.WriteWindowDropped++
}
}

func (s *ShardMapping) Dropped() int {
return s.RetentionDropped + s.WriteWindowDropped
}

func (s *ShardMapping) SummariseDropped() string {
if s.Dropped() <= 0 {
return ""
}
return fmt.Sprintf("dropped %d points outside retention policy of duration %s - oldest %s, newest %s",
s.RetentionDropped,
s.rpi.Duration.String(),
s.MinDropped.String(),
s.MaxDropped.String())
}

// MapPoint adds the point to the ShardMapping, associated with the given shardInfo.
Expand Down Expand Up @@ -247,13 +317,13 @@ func (w *PointsWriter) MapShards(wp *WritePointsRequest) (*ShardMapping, error)
list.Add(*sg)
}

mapping := NewShardMapping(len(wp.Points))
mapping := NewShardMapping(rp, len(wp.Points))
for _, p := range wp.Points {
sg := list.ShardGroupAt(p.Time())
if sg == nil {
// We didn't create a shard group because the point was outside the
// scope of the RP.
mapping.Dropped = append(mapping.Dropped, p)
// scope of the RP
mapping.AddDropped(p, min, RetentionPolicyBound)
continue
} else if len(sg.Shards) <= 0 {
// Shard groups should have at least one shard.
Expand Down Expand Up @@ -361,7 +431,7 @@ func (w *PointsWriter) WritePoints(
ctx context.Context,
database, retentionPolicy string,
consistencyLevel models.ConsistencyLevel,
user meta.User,
_ meta.User,
points []models.Point,
) error {
return w.WritePointsPrivileged(ctx, database, retentionPolicy, consistencyLevel, points)
Expand Down Expand Up @@ -406,12 +476,18 @@ func (w *PointsWriter) WritePointsPrivileged(
}(shardMappings.Shards[shardID], database, retentionPolicy, points)
}

if len(shardMappings.Dropped) > 0 {
w.stats.pointsWriteDropped.Observe(float64(len(shardMappings.Dropped)))
err = tsdb.PartialWriteError{Reason: "points beyond retention policy", Dropped: len(shardMappings.Dropped)}
}
timeout := time.NewTimer(w.WriteTimeout)
defer timeout.Stop()

if err == nil && shardMappings.Dropped() > 0 {
w.stats.pointsWriteDropped.Observe(float64(shardMappings.Dropped()))
err = tsdb.PartialWriteError{Reason: shardMappings.SummariseDropped(),
Dropped: shardMappings.Dropped(),
Database: database,
RetentionPolicy: retentionPolicy,
}
}

for range shardMappings.Points {
select {
case <-w.closing:
Expand Down
71 changes: 58 additions & 13 deletions v1/coordinator/points_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package coordinator_test

import (
"context"
"errors"
"fmt"
"github.com/stretchr/testify/require"
"sync"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -51,6 +53,47 @@ func TestPointsWriter_MapShards_One(t *testing.T) {
}
}

func MapPoints(t *testing.T, c *coordinator.PointsWriter, pr *coordinator.WritePointsRequest, values []float64, droppedCount int, minDropped *coordinator.DroppedPoint, maxDropped *coordinator.DroppedPoint, summary string) {
var (
shardMappings *coordinator.ShardMapping
err error
)
if shardMappings, err = c.MapShards(pr); err != nil {
t.Fatalf("unexpected error: %v", err)
}

if exp := 1; len(shardMappings.Points) != exp {
t.Errorf("MapShards() len mismatch. got %v, exp %v", len(shardMappings.Points), exp)
}

p := func() []models.Point {
for _, v := range shardMappings.Points {
return v
}
return nil
}()
verify :=
func(p []models.Point, values []float64) {
require.Equal(t, len(values), len(p), "unexpected number of points")
for i, expV := range values {
f, err := p[i].Fields()
require.NoError(t, err, "error retrieving fields")
v, ok := f["value"]
require.True(t, ok, "\"value\" field not found")
require.Equal(t, expV, v, "unexpected value")
}
}
verify(p, values)
require.Equal(t, shardMappings.Dropped(), droppedCount, "wrong number of points dropped")
if shardMappings.Dropped() > 0 {
require.Equal(t, minDropped.Point, shardMappings.MinDropped.Point, "minimum dropped point mismatch")
require.Equal(t, minDropped.Reason, shardMappings.MinDropped.Reason, "minimum dropped reason mismatch")
require.Equal(t, maxDropped.Point, shardMappings.MaxDropped.Point, "maximum dropped point mismatch")
require.Equal(t, maxDropped.Reason, shardMappings.MaxDropped.Reason, "maximum dropped reason mismatch")
require.Contains(t, shardMappings.SummariseDropped(), summary, "summary mismatch")
}
}

// Ensures the points writer maps to a new shard group when the shard duration
// is changed.
func TestPointsWriter_MapShards_AlterShardDuration(t *testing.T) {
Expand Down Expand Up @@ -239,9 +282,11 @@ func TestPointsWriter_MapShards_Invalid(t *testing.T) {
t.Errorf("MapShards() len mismatch. got %v, exp %v", got, exp)
}

if got, exp := len(shardMappings.Dropped), 1; got != exp {
if got, exp := shardMappings.RetentionDropped, 1; got != exp {
t.Fatalf("MapShard() dropped mismatch: got %v, exp %v", got, exp)
}

require.Equal(t, coordinator.RetentionPolicyBound, shardMappings.MinDropped.Reason, "unexpected reason for dropped point")
}

func TestPointsWriter_WritePoints(t *testing.T) {
Expand Down Expand Up @@ -288,7 +333,7 @@ func TestPointsWriter_WritePoints(t *testing.T) {
pr.AddPoint("cpu", 3.0, time.Now().Add(time.Hour+time.Second), nil)

// copy to prevent data race
sm := coordinator.NewShardMapping(16)
sm := coordinator.NewShardMapping(nil, 16)
sm.MapPoint(
&meta.ShardInfo{ID: uint64(1), Owners: []meta.ShardOwner{
{NodeID: 1},
Expand Down Expand Up @@ -360,16 +405,9 @@ func TestPointsWriter_WritePoints_Dropped(t *testing.T) {
// are created.
ms := NewPointsWriterMetaClient()

// Three points that range over the shardGroup duration (1h) and should map to two
// distinct shards
// Add a point earlier than the retention period
pr.AddPoint("cpu", 1.0, time.Now().Add(-24*time.Hour), nil)

// copy to prevent data race
sm := coordinator.NewShardMapping(16)

// ShardMapper dropped this point
sm.Dropped = append(sm.Dropped, pr.Points[0])

// Local coordinator.Node ShardWriter
// lock on the write increment since these functions get called in parallel
var mu sync.Mutex
Expand All @@ -392,13 +430,20 @@ func TestPointsWriter_WritePoints_Dropped(t *testing.T) {
c.TSDBStore = store
c.Node = &influxdb.Node{ID: 1}

c.Open()
defer c.Close()
require.NoError(t, c.Open(), "failure opening PointsWriter")
defer func(pw *coordinator.PointsWriter) {
require.NoError(t, pw.Close(), "failure closing PointsWriter")
}(c)

err := c.WritePointsPrivileged(context.Background(), pr.Database, pr.RetentionPolicy, models.ConsistencyLevelOne, pr.Points)
if _, ok := err.(tsdb.PartialWriteError); !ok {
require.Error(t, err, "unexpected success writing points")
var pwErr tsdb.PartialWriteError
if !errors.As(err, &pwErr) {
t.Errorf("PointsWriter.WritePoints(): got %v, exp %v", err, tsdb.PartialWriteError{})
}
require.Equal(t, 1, pwErr.Dropped, "wrong number of points dropped")
require.ErrorContains(t, pwErr, "partial write: dropped 1 points outside retention policy of duration 1h0m0s")
require.ErrorContains(t, pwErr, "Retention Policy Lower Bound")
}

var shardID uint64
Expand Down