diff --git a/pkg/logql/downstream_test.go b/pkg/logql/downstream_test.go index aa7e0b7914971..cd3964cbb9904 100644 --- a/pkg/logql/downstream_test.go +++ b/pkg/logql/downstream_test.go @@ -49,6 +49,7 @@ func TestMappingEquivalence(t *testing.T) { {`sum(max(rate({a=~".+"}[1s])))`, false}, {`max(count(rate({a=~".+"}[1s])))`, false}, {`max(sum by (cluster) (rate({a=~".+"}[1s]))) / count(rate({a=~".+"}[1s]))`, false}, + {`sum(rate({a=~".+"} |= "foo" != "foo"[1s]) or vector(1))`, false}, // topk prefers already-seen values in tiebreakers. Since the test data generates // the same log lines for each series & the resulting promql.Vectors aren't deterministically // sorted by labels, we don't expect this to pass. diff --git a/pkg/logql/shardmapper_test.go b/pkg/logql/shardmapper_test.go index 8ae45bea087f3..5fa393acecbb0 100644 --- a/pkg/logql/shardmapper_test.go +++ b/pkg/logql/shardmapper_test.go @@ -120,7 +120,7 @@ func TestMappingStrings(t *testing.T) { }{ { in: `{foo="bar"}`, - out: `downstream<{foo="bar"}, shard=0_of_2> + out: `downstream<{foo="bar"}, shard=0_of_2> ++ downstream<{foo="bar"}, shard=1_of_2>`, }, { @@ -131,7 +131,7 @@ func TestMappingStrings(t *testing.T) { { in: `sum(rate({foo="bar"}[1m]))`, out: `sum( - downstream + downstream ++ downstream )`, }, @@ -139,7 +139,7 @@ func TestMappingStrings(t *testing.T) { in: `max(count(rate({foo="bar"}[5m]))) / 2`, out: `(max( sum( - downstream + downstream ++ downstream) ) / 2 )`, @@ -147,14 +147,14 @@ func TestMappingStrings(t *testing.T) { { in: `topk(3, rate({foo="bar"}[5m]))`, out: `topk(3, - downstream + downstream ++ downstream )`, }, { in: `sum(max(rate({foo="bar"}[5m])))`, out: `sum(max( - downstream + downstream ++ downstream ))`, }, @@ -169,33 +169,33 @@ func TestMappingStrings(t *testing.T) { { in: `count(rate({foo="bar"} | json [5m]))`, out: `count( - downstream + downstream ++ downstream )`, }, { in: `avg(rate({foo="bar"} | json [5m]))`, out: `avg( - downstream + downstream ++ downstream )`, }, { in: `{foo="bar"} |= "id=123"`, - out: `downstream<{foo="bar"}|="id=123", shard=0_of_2> + out: `downstream<{foo="bar"}|="id=123", shard=0_of_2> ++ downstream<{foo="bar"}|="id=123", shard=1_of_2>`, }, { in: `sum by (cluster) (rate({foo="bar"} |= "id=123" [5m]))`, out: `sum by (cluster) ( - downstream + downstream ++ downstream )`, }, { in: `sum by (cluster) (sum_over_time({foo="bar"} |= "id=123" | logfmt | unwrap latency [5m]))`, out: `sum by (cluster) ( - downstream + downstream ++ downstream )`, }, @@ -231,7 +231,7 @@ func TestMappingStrings(t *testing.T) { in: `sum by (cluster) (rate({foo="bar"} [5m])) + ignoring(machine) sum by (cluster,machine) (rate({foo="bar"} [5m]))`, out: `( sum by (cluster) ( - downstream + downstream ++ downstream ) + ignoring(machine) sum by (cluster, machine) ( @@ -268,6 +268,32 @@ func TestMappingStrings(t *testing.T) { in: `vector(0)`, out: `vector(0.000000)`, }, + { + // or exprs aren't shardable + in: `count_over_time({a=~".+"}[1s]) or count_over_time({a=~".+"}[1s])`, + out: `(downstream++downstreamordownstream++downstream)`, + }, + { + // vector() exprs aren't shardable + in: `sum(count_over_time({a=~".+"}[1s]) + vector(1))`, + out: `sum((downstream++downstream+vector(1.000000)))`, + }, + { + // on() is never shardable as it can mutate labels + in: `sum(count_over_time({a=~".+"}[1s]) * on () count_over_time({a=~".+"}[1s]))`, + out: `sum((downstream++downstream*on()downstream++downstream))`, + }, + { + // ignoring() is never shardable as it can mutate labels + in: `sum(count_over_time({a=~".+"}[1s]) * ignoring (foo) count_over_time({a=~".+"}[1s]))`, + out: `sum((downstream++downstream*ignoring(foo)downstream++downstream))`, + }, + { + // ignoring () doesn't mutate labels and therefore can be shardable + // as long as the operation is shardable + in: `sum(count_over_time({a=~".+"}[1s]) * ignoring () count_over_time({a=~".+"}[1s]))`, + out: `sum(downstream++downstream)`, + }, } { t.Run(tc.in, func(t *testing.T) { ast, err := syntax.ParseExpr(tc.in) diff --git a/pkg/logql/syntax/ast.go b/pkg/logql/syntax/ast.go index 93f09804af728..dc83992aa8fc4 100644 --- a/pkg/logql/syntax/ast.go +++ b/pkg/logql/syntax/ast.go @@ -1174,8 +1174,15 @@ func (e *BinOpExpr) String() string { // impl SampleExpr func (e *BinOpExpr) Shardable() bool { if e.Opts != nil && e.Opts.VectorMatching != nil { - // prohibit sharding when we're changing the label groupings, such as on or ignoring - return false + matching := e.Opts.VectorMatching + // prohibit sharding when we're changing the label groupings, + // such as when using `on` grouping or when using + // `ignoring` with a non-zero set of labels to ignore. + // `ignoring ()` is effectively the zero value + // that doesn't mutate labels and is shardable. + if matching.On || len(matching.MatchingLabels) > 0 { + return false + } } return shardableOps[e.Op] && e.SampleExpr.Shardable() && e.RHS.Shardable() } diff --git a/pkg/logql/syntax/parser_test.go b/pkg/logql/syntax/parser_test.go index b7f002c3059b4..3746eb9385285 100644 --- a/pkg/logql/syntax/parser_test.go +++ b/pkg/logql/syntax/parser_test.go @@ -3013,6 +3013,33 @@ func TestParse(t *testing.T) { Operation: "count_over_time", }, }, + { + // binop always includes vector matching. Default is `without ()`, + // the zero value. + in: ` + sum(count_over_time({foo="bar"}[5m])) or vector(1) + `, + exp: mustNewBinOpExpr( + OpTypeOr, + &BinOpOptions{ + VectorMatching: &VectorMatching{Card: CardOneToOne}, + }, + mustNewVectorAggregationExpr(newRangeAggregationExpr( + &LogRange{ + Left: &MatchersExpr{ + Mts: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + Interval: 5 * time.Minute, + }, OpRangeTypeCount, nil, nil), + "sum", + &Grouping{}, + nil, + ), + NewVectorExpr("1"), + ), + }, } { t.Run(tc.in, func(t *testing.T) { ast, err := ParseExpr(tc.in)