Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/logql/downstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func TestMappingEquivalence(t *testing.T) {
{`sum(max(rate({a=~".+"}[1s])))`, false},
{`max(count(rate({a=~".+"}[1s])))`, false},
{`max(sum by (cluster) (rate({a=~".+"}[1s]))) / count(rate({a=~".+"}[1s]))`, false},
{`sum(rate({a=~".+"} |= "foo" != "foo"[1s]) or vector(1))`, false},
// topk prefers already-seen values in tiebreakers. Since the test data generates
// the same log lines for each series & the resulting promql.Vectors aren't deterministically
// sorted by labels, we don't expect this to pass.
Expand Down
48 changes: 37 additions & 11 deletions pkg/logql/shardmapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func TestMappingStrings(t *testing.T) {
}{
{
in: `{foo="bar"}`,
out: `downstream<{foo="bar"}, shard=0_of_2>
out: `downstream<{foo="bar"}, shard=0_of_2>
++ downstream<{foo="bar"}, shard=1_of_2>`,
},
{
Expand All @@ -131,30 +131,30 @@ func TestMappingStrings(t *testing.T) {
{
in: `sum(rate({foo="bar"}[1m]))`,
out: `sum(
downstream<sum(rate({foo="bar"}[1m])), shard=0_of_2>
downstream<sum(rate({foo="bar"}[1m])), shard=0_of_2>
++ downstream<sum(rate({foo="bar"}[1m])), shard=1_of_2>
)`,
},
{
in: `max(count(rate({foo="bar"}[5m]))) / 2`,
out: `(max(
sum(
downstream<count(rate({foo="bar"}[5m])), shard=0_of_2>
downstream<count(rate({foo="bar"}[5m])), shard=0_of_2>
++ downstream<count(rate({foo="bar"}[5m])), shard=1_of_2>)
) / 2
)`,
},
{
in: `topk(3, rate({foo="bar"}[5m]))`,
out: `topk(3,
downstream<rate({foo="bar"}[5m]), shard=0_of_2>
downstream<rate({foo="bar"}[5m]), shard=0_of_2>
++ downstream<rate({foo="bar"}[5m]), shard=1_of_2>
)`,
},
{
in: `sum(max(rate({foo="bar"}[5m])))`,
out: `sum(max(
downstream<rate({foo="bar"}[5m]), shard=0_of_2>
downstream<rate({foo="bar"}[5m]), shard=0_of_2>
++ downstream<rate({foo="bar"}[5m]), shard=1_of_2>
))`,
},
Expand All @@ -169,33 +169,33 @@ func TestMappingStrings(t *testing.T) {
{
in: `count(rate({foo="bar"} | json [5m]))`,
out: `count(
downstream<rate({foo="bar"} | json [5m]), shard=0_of_2>
downstream<rate({foo="bar"} | json [5m]), shard=0_of_2>
++ downstream<rate({foo="bar"} | json [5m]), shard=1_of_2>
)`,
},
{
in: `avg(rate({foo="bar"} | json [5m]))`,
out: `avg(
downstream<rate({foo="bar"} | json [5m]), shard=0_of_2>
downstream<rate({foo="bar"} | json [5m]), shard=0_of_2>
++ downstream<rate({foo="bar"} | json [5m]), shard=1_of_2>
)`,
},
{
in: `{foo="bar"} |= "id=123"`,
out: `downstream<{foo="bar"}|="id=123", shard=0_of_2>
out: `downstream<{foo="bar"}|="id=123", shard=0_of_2>
++ downstream<{foo="bar"}|="id=123", shard=1_of_2>`,
},
{
in: `sum by (cluster) (rate({foo="bar"} |= "id=123" [5m]))`,
out: `sum by (cluster) (
downstream<sum by(cluster)(rate({foo="bar"}|="id=123"[5m])), shard=0_of_2>
downstream<sum by(cluster)(rate({foo="bar"}|="id=123"[5m])), shard=0_of_2>
++ downstream<sum by(cluster)(rate({foo="bar"}|="id=123"[5m])), shard=1_of_2>
)`,
},
{
in: `sum by (cluster) (sum_over_time({foo="bar"} |= "id=123" | logfmt | unwrap latency [5m]))`,
out: `sum by (cluster) (
downstream<sum by(cluster)(sum_over_time({foo="bar"}|="id=123"| logfmt | unwrap latency[5m])), shard=0_of_2>
downstream<sum by(cluster)(sum_over_time({foo="bar"}|="id=123"| logfmt | unwrap latency[5m])), shard=0_of_2>
++ downstream<sum by(cluster)(sum_over_time({foo="bar"}|="id=123"| logfmt | unwrap latency[5m])), shard=1_of_2>
)`,
},
Expand Down Expand Up @@ -231,7 +231,7 @@ func TestMappingStrings(t *testing.T) {
in: `sum by (cluster) (rate({foo="bar"} [5m])) + ignoring(machine) sum by (cluster,machine) (rate({foo="bar"} [5m]))`,
out: `(
sum by (cluster) (
downstream<sum by (cluster) (rate({foo="bar"}[5m])), shard=0_of_2>
downstream<sum by (cluster) (rate({foo="bar"}[5m])), shard=0_of_2>
++ downstream<sum by (cluster) (rate({foo="bar"}[5m])), shard=1_of_2>
)
+ ignoring(machine) sum by (cluster, machine) (
Expand Down Expand Up @@ -268,6 +268,32 @@ func TestMappingStrings(t *testing.T) {
in: `vector(0)`,
out: `vector(0.000000)`,
},
{
// or exprs aren't shardable
in: `count_over_time({a=~".+"}[1s]) or count_over_time({a=~".+"}[1s])`,
out: `(downstream<count_over_time({a=~".+"}[1s]),shard=0_of_2>++downstream<count_over_time({a=~".+"}[1s]),shard=1_of_2>ordownstream<count_over_time({a=~".+"}[1s]),shard=0_of_2>++downstream<count_over_time({a=~".+"}[1s]),shard=1_of_2>)`,
},
{
// vector() exprs aren't shardable
in: `sum(count_over_time({a=~".+"}[1s]) + vector(1))`,
out: `sum((downstream<count_over_time({a=~".+"}[1s]),shard=0_of_2>++downstream<count_over_time({a=~".+"}[1s]),shard=1_of_2>+vector(1.000000)))`,
},
{
// on() is never shardable as it can mutate labels
in: `sum(count_over_time({a=~".+"}[1s]) * on () count_over_time({a=~".+"}[1s]))`,
out: `sum((downstream<count_over_time({a=~".+"}[1s]),shard=0_of_2>++downstream<count_over_time({a=~".+"}[1s]),shard=1_of_2>*on()downstream<count_over_time({a=~".+"}[1s]),shard=0_of_2>++downstream<count_over_time({a=~".+"}[1s]),shard=1_of_2>))`,
},
{
// ignoring(<non-empty-labels>) is never shardable as it can mutate labels
in: `sum(count_over_time({a=~".+"}[1s]) * ignoring (foo) count_over_time({a=~".+"}[1s]))`,
out: `sum((downstream<count_over_time({a=~".+"}[1s]),shard=0_of_2>++downstream<count_over_time({a=~".+"}[1s]),shard=1_of_2>*ignoring(foo)downstream<count_over_time({a=~".+"}[1s]),shard=0_of_2>++downstream<count_over_time({a=~".+"}[1s]),shard=1_of_2>))`,
},
{
// ignoring () doesn't mutate labels and therefore can be shardable
// as long as the operation is shardable
in: `sum(count_over_time({a=~".+"}[1s]) * ignoring () count_over_time({a=~".+"}[1s]))`,
out: `sum(downstream<sum((count_over_time({a=~".+"}[1s])*count_over_time({a=~".+"}[1s]))),shard=0_of_2>++downstream<sum((count_over_time({a=~".+"}[1s])*count_over_time({a=~".+"}[1s]))),shard=1_of_2>)`,
},
} {
t.Run(tc.in, func(t *testing.T) {
ast, err := syntax.ParseExpr(tc.in)
Expand Down
11 changes: 9 additions & 2 deletions pkg/logql/syntax/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -1174,8 +1174,15 @@ func (e *BinOpExpr) String() string {
// impl SampleExpr
func (e *BinOpExpr) Shardable() bool {
if e.Opts != nil && e.Opts.VectorMatching != nil {
// prohibit sharding when we're changing the label groupings, such as on or ignoring
return false
matching := e.Opts.VectorMatching
// prohibit sharding when we're changing the label groupings,
// such as when using `on` grouping or when using
// `ignoring` with a non-zero set of labels to ignore.
// `ignoring ()` is effectively the zero value
// that doesn't mutate labels and is shardable.
if matching.On || len(matching.MatchingLabels) > 0 {
return false
}
}
return shardableOps[e.Op] && e.SampleExpr.Shardable() && e.RHS.Shardable()
}
Expand Down
27 changes: 27 additions & 0 deletions pkg/logql/syntax/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3013,6 +3013,33 @@ func TestParse(t *testing.T) {
Operation: "count_over_time",
},
},
{
// binop always includes vector matching. Default is `without ()`,
// the zero value.
in: `
sum(count_over_time({foo="bar"}[5m])) or vector(1)
`,
exp: mustNewBinOpExpr(
OpTypeOr,
&BinOpOptions{
VectorMatching: &VectorMatching{Card: CardOneToOne},
},
mustNewVectorAggregationExpr(newRangeAggregationExpr(
&LogRange{
Left: &MatchersExpr{
Mts: []*labels.Matcher{
mustNewMatcher(labels.MatchEqual, "foo", "bar"),
},
},
Interval: 5 * time.Minute,
}, OpRangeTypeCount, nil, nil),
"sum",
&Grouping{},
nil,
),
NewVectorExpr("1"),
),
},
} {
t.Run(tc.in, func(t *testing.T) {
ast, err := ParseExpr(tc.in)
Expand Down