-
Notifications
You must be signed in to change notification settings - Fork 3.7k
feat: improve dropped point logging #26257
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
0538273
3741b16
b1efaf7
e10e9ca
bd5b1bd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -95,12 +95,50 @@ func NewPointsWriter() *PointsWriter { | |
| } | ||
| } | ||
|
|
||
| type BoundType int | ||
|
|
||
| const ( | ||
| WithinBounds BoundType = iota | ||
| RetentionPolicyBound | ||
| WriteWindowUpperBound | ||
| WriteWindowLowerBound | ||
| MaxBoundType // always the largest bound type, not for actual use | ||
| ) | ||
|
|
||
| func (b BoundType) String() string { | ||
| switch b { | ||
| case RetentionPolicyBound: | ||
| return "Retention Policy Lower Bound" | ||
| case WriteWindowUpperBound: | ||
| return "Write Window Upper Bound" | ||
| case WriteWindowLowerBound: | ||
| return "Write Window Lower Bound" | ||
| case WithinBounds: | ||
| return "Within Bounds" | ||
| default: | ||
| return "Unknown" | ||
| } | ||
| } | ||
|
|
||
| type DroppedPoint struct { | ||
| Point models.Point | ||
| ViolatedBound time.Time | ||
| Reason BoundType | ||
| } | ||
|
|
||
| func (d *DroppedPoint) String() string { | ||
| return fmt.Sprintf("point %s at %s dropped because it violates a %s at %s", d.Point.Key(), d.Point.Time().UTC().Format(time.RFC3339Nano), d.Reason.String(), d.ViolatedBound.UTC().Format(time.RFC3339Nano)) | ||
| } | ||
|
|
||
| // ShardMapping contains a mapping of shards to points. | ||
| type ShardMapping struct { | ||
| n int | ||
| Points map[uint64][]models.Point // The points associated with a shard ID | ||
| Shards map[uint64]*meta.ShardInfo // The shards that have been mapped, keyed by shard ID | ||
| Dropped []models.Point // Points that were dropped | ||
| n int | ||
| Points map[uint64][]models.Point // The points associated with a shard ID | ||
| Shards map[uint64]*meta.ShardInfo // The shards that have been mapped, keyed by shard ID | ||
| MaxDropped DroppedPoint | ||
| MinDropped DroppedPoint | ||
| RetentionDropped int | ||
| WriteWindowDropped int | ||
| } | ||
|
|
||
| // NewShardMapping creates an empty ShardMapping. | ||
|
|
@@ -112,6 +150,36 @@ func NewShardMapping(n int) *ShardMapping { | |
| } | ||
| } | ||
|
|
||
| func (s *ShardMapping) AddDropped(p models.Point, t time.Time, b BoundType) { | ||
| if s.MaxDropped.Point == nil || p.Time().After(s.MaxDropped.Point.Time()) { | ||
| s.MaxDropped = DroppedPoint{Point: p, ViolatedBound: t, Reason: b} | ||
| } | ||
| if s.MinDropped.Point == nil || p.Time().Before(s.MinDropped.Point.Time()) { | ||
| s.MinDropped = DroppedPoint{Point: p, ViolatedBound: t, Reason: b} | ||
| } | ||
| switch b { | ||
| case RetentionPolicyBound: | ||
| s.RetentionDropped++ | ||
| case WriteWindowLowerBound, WriteWindowUpperBound: | ||
| s.WriteWindowDropped++ | ||
| } | ||
| } | ||
|
|
||
| func (s *ShardMapping) Dropped() int { | ||
| return s.RetentionDropped + s.WriteWindowDropped | ||
| } | ||
|
|
||
| func (s *ShardMapping) SummariseDropped() string { | ||
| if s.Dropped() <= 0 { | ||
| return "" | ||
| } | ||
| return fmt.Sprintf("dropped %d points outside retention policy and %d points outside write window: %s to %s", | ||
| s.RetentionDropped, | ||
| s.WriteWindowDropped, | ||
| s.MinDropped.String(), | ||
| s.MaxDropped.String()) | ||
| } | ||
|
|
||
| // MapPoint adds the point to the ShardMapping, associated with the given shardInfo. | ||
| func (s *ShardMapping) MapPoint(shardInfo *meta.ShardInfo, p models.Point) { | ||
| if cap(s.Points[shardInfo.ID]) < s.n { | ||
|
|
@@ -147,14 +215,14 @@ func NewWriteWindow(rp *meta.RetentionPolicyInfo) *WriteWindow { | |
| return w | ||
| } | ||
|
|
||
| func (w *WriteWindow) WithinWindow(t time.Time) bool { | ||
| func (w *WriteWindow) WithinWindow(t time.Time) (bool, time.Time, BoundType) { | ||
| if w.checkBefore && t.Before(w.before) { | ||
| return false | ||
| return false, w.before, WriteWindowLowerBound | ||
| } | ||
| if w.checkAfter && t.After(w.after) { | ||
| return false | ||
| return false, w.after, WriteWindowUpperBound | ||
| } | ||
| return true | ||
| return true, time.Time{}, WithinBounds | ||
| } | ||
|
|
||
| // Open opens the communication channel with the point writer. | ||
|
|
@@ -229,7 +297,8 @@ func (w *PointsWriter) MapShards(wp *WritePointsRequest) (*ShardMapping, error) | |
| // Either the point is outside the scope of the RP, we already have | ||
| // a suitable shard group for the point, or it is outside the write window | ||
| // for the RP, and we don't want to unnecessarily create a shard for it | ||
| if p.Time().Before(min) || list.Covers(p.Time()) || !ww.WithinWindow(p.Time()) { | ||
| withinWindow, _, _ := ww.WithinWindow(p.Time()) | ||
| if p.Time().Before(min) || list.Covers(p.Time()) || !withinWindow { | ||
| continue | ||
| } | ||
|
|
||
|
|
@@ -249,10 +318,15 @@ func (w *PointsWriter) MapShards(wp *WritePointsRequest) (*ShardMapping, error) | |
| mapping := NewShardMapping(len(wp.Points)) | ||
| for _, p := range wp.Points { | ||
| sg := list.ShardGroupAt(p.Time()) | ||
| if sg == nil || !ww.WithinWindow(p.Time()) { | ||
| if sg == nil { | ||
| // We didn't create a shard group because the point was outside the | ||
| // scope of the RP, or the point is outside the write window for the RP. | ||
| mapping.Dropped = append(mapping.Dropped, p) | ||
| // scope of the RP | ||
| mapping.AddDropped(p, min, RetentionPolicyBound) | ||
| atomic.AddInt64(&w.stats.WriteDropped, 1) | ||
| continue | ||
| } else if withinWindow, bound, reason := ww.WithinWindow(p.Time()); !withinWindow { | ||
| // The point is outside the write window for the RP. | ||
| mapping.AddDropped(p, bound, reason) | ||
| atomic.AddInt64(&w.stats.WriteDropped, 1) | ||
| continue | ||
| } else if len(sg.Shards) <= 0 { | ||
|
|
@@ -420,9 +494,9 @@ func (w *PointsWriter) WritePointsPrivileged(writeCtx tsdb.WriteContext, databas | |
| w.Subscriber.Send(pts) | ||
| atomic.AddInt64(&w.stats.SubWriteOK, 1) | ||
|
|
||
| if err == nil && len(shardMappings.Dropped) > 0 { | ||
| err = tsdb.PartialWriteError{Reason: "points beyond retention policy or outside permissible write window", | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The new code would have an error message of "dropped %d points outside retention policy of duration". I assume this message is what the client will receive. Any risk of this breaking any client libraries or C1 alerting?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In theory, it could break picky client code, but my hope is that people are switching on the 4XX HTTP error, not the message text. |
||
| Dropped: len(shardMappings.Dropped), | ||
| if err == nil && shardMappings.Dropped() > 0 { | ||
| err = tsdb.PartialWriteError{Reason: shardMappings.SummariseDropped(), | ||
| Dropped: shardMappings.Dropped(), | ||
| Database: database, | ||
| RetentionPolicy: retentionPolicy, | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to include the retention policy and/or write window bounds in the summary? Might make debugging customer issues easier, especially if their retention or write window is now what they think it is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DroppedPoint.String()prints that information, so we get it for the max and min points.