Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 20 additions & 30 deletions doctests/query_agg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,15 +228,13 @@ func ExampleClient_query_agg() {
"idx:bicycle",
"@condition:{new}",
&redis.FTAggregateOptions{
Apply: []redis.FTAggregateApply{
{
Steps: []redis.FTAggregateStep{
{Load: &redis.FTAggregateLoad{Field: "__key"}},
{Load: &redis.FTAggregateLoad{Field: "price"}},
{Apply: &redis.FTAggregateApply{
Field: "@price - (@price * 0.1)",
As: "discounted",
},
},
Load: []redis.FTAggregateLoad{
{Field: "__key"},
{Field: "price"},
}},
},
},
).Result()
Expand Down Expand Up @@ -271,17 +269,13 @@ func ExampleClient_query_agg() {
res2, err := rdb.FTAggregateWithArgs(ctx,
"idx:bicycle", "*",
&redis.FTAggregateOptions{
Load: []redis.FTAggregateLoad{
{Field: "price"},
},
Apply: []redis.FTAggregateApply{
{
Steps: []redis.FTAggregateStep{
{Load: &redis.FTAggregateLoad{Field: "price"}},
{Apply: &redis.FTAggregateApply{
Field: "@price<1000",
As: "price_category",
},
},
GroupBy: []redis.FTAggregateGroupBy{
{
}},
{GroupBy: &redis.FTAggregateGroupBy{
Fields: []interface{}{"@condition"},
Reduce: []redis.FTAggregateReducer{
{
Expand All @@ -290,7 +284,7 @@ func ExampleClient_query_agg() {
As: "num_affordable",
},
},
},
}},
},
},
).Result()
Expand Down Expand Up @@ -323,22 +317,20 @@ func ExampleClient_query_agg() {
res3, err := rdb.FTAggregateWithArgs(ctx,
"idx:bicycle", "*",
&redis.FTAggregateOptions{
Apply: []redis.FTAggregateApply{
{
Steps: []redis.FTAggregateStep{
{Apply: &redis.FTAggregateApply{
Field: "'bicycle'",
As: "type",
},
},
GroupBy: []redis.FTAggregateGroupBy{
{
}},
{GroupBy: &redis.FTAggregateGroupBy{
Fields: []interface{}{"@type"},
Reduce: []redis.FTAggregateReducer{
{
Reducer: redis.SearchCount,
As: "num_total",
},
},
},
}},
},
},
).Result()
Expand All @@ -363,11 +355,9 @@ func ExampleClient_query_agg() {
res4, err := rdb.FTAggregateWithArgs(ctx,
"idx:bicycle", "*",
&redis.FTAggregateOptions{
Load: []redis.FTAggregateLoad{
{Field: "__key"},
},
GroupBy: []redis.FTAggregateGroupBy{
{
Steps: []redis.FTAggregateStep{
{Load: &redis.FTAggregateLoad{Field: "__key"}},
{GroupBy: &redis.FTAggregateGroupBy{
Fields: []interface{}{"@condition"},
Reduce: []redis.FTAggregateReducer{
{
Expand All @@ -376,7 +366,7 @@ func ExampleClient_query_agg() {
As: "bicycles",
},
},
},
}},
},
},
).Result()
Expand Down
32 changes: 32 additions & 0 deletions example/search-aggregate-steps/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# FT.AGGREGATE step-based pipeline example

This example demonstrates `FTAggregateOptions.Steps`, which lets `LOAD`,
`APPLY`, `GROUPBY` and `SORTBY` appear multiple times in any order in an
`FT.AGGREGATE` pipeline.

The main motivation is shard-level trimming: running a `SORTBY ... MAX N`
**before** a `GROUPBY` so each shard only ships its top-N rows onward.

## Prerequisites

A Redis instance with the Search module loaded (Redis 8+, or Redis Stack)
listening on `localhost:6379`.

The client is configured with `Protocol: 2` because the RESP3 response shape
of `FT.AGGREGATE` is still marked unstable; set `UnstableResp3: true` instead
if you want to use RESP3.

## Run

```shell
go run .
```

The example:

1. Creates an index `idx:products` over hashes prefixed with `product:`.
2. Seeds 10 products with `category`, `brand`, `price`, `quantity`, `rating`.
3. Runs three aggregations:
- **Trim before GROUPBY** — `SORTBY @rating DESC MAX 5` → `GROUPBY @category` → `SORTBY @price_total DESC`.
- **Multi-stage pipeline** — `LOAD` → `LOAD` → `APPLY` → `SORTBY MAX` → `GROUPBY` → `APPLY` → `SORTBY`.
- **AggregateBuilder** — the same first query expressed via the fluent `NewAggregateBuilder` API.
12 changes: 12 additions & 0 deletions example/search-aggregate-steps/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
module github.com/redis/go-redis/example/search-aggregate-steps

go 1.24

replace github.com/redis/go-redis/v9 => ../..

require github.com/redis/go-redis/v9 v9.18.0

require (
github.com/cespare/xxhash/v2 v2.3.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
)
20 changes: 20 additions & 0 deletions example/search-aggregate-steps/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/klauspost/cpuid/v2 v2.2.10 h1:tBs3QSyvjDyFTq3uoc/9xFpCuOsJQFNPiAhYdw2skhE=
github.com/klauspost/cpuid/v2 v2.2.10/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/zeebo/xxh3 v1.1.0 h1:s7DLGDK45Dyfg7++yxI0khrfwq9661w9EN78eP/UZVs=
github.com/zeebo/xxh3 v1.1.0/go.mod h1:IisAie1LELR4xhVinxWS5+zf1lA4p0MW4T+w+W07F5s=
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc=
golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
150 changes: 150 additions & 0 deletions example/search-aggregate-steps/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
// Example demonstrating the step-based FT.AGGREGATE API.
//
// FTAggregateOptions.Steps lets LOAD, APPLY, GROUPBY and SORTBY appear
// multiple times in any order. This is useful for shard-level trimming
// (SORTBY ... MAX N) before GROUPBY and multi-stage pipelines.
//
// Requires a Redis instance with the Search module (e.g. Redis 8+).
package main

import (
"context"
"fmt"
"log"

"github.com/redis/go-redis/v9"
)

const indexName = "idx:products"

func main() {
ctx := context.Background()

rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Protocol: 2,
})
defer rdb.Close()

if err := rdb.FlushDB(ctx).Err(); err != nil {
log.Fatalf("flushdb: %v", err)
}
createIndex(ctx, rdb)
seedProducts(ctx, rdb)

fmt.Println("# 1. Trim before GROUPBY (real-world use case)")
trimBeforeGroup(ctx, rdb)

fmt.Println("\n# 2. Multi-stage pipeline (LOAD -> APPLY -> SORTBY -> GROUPBY -> APPLY -> SORTBY)")
multiStagePipeline(ctx, rdb)

fmt.Println("\n# 3. Same as #1 but using AggregateBuilder")
withBuilder(ctx, rdb)
}

func createIndex(ctx context.Context, rdb *redis.Client) {
_, err := rdb.FTCreate(ctx, indexName,
&redis.FTCreateOptions{OnHash: true, Prefix: []interface{}{"product:"}},
&redis.FieldSchema{FieldName: "category", FieldType: redis.SearchFieldTypeTag},
&redis.FieldSchema{FieldName: "brand", FieldType: redis.SearchFieldTypeTag},
&redis.FieldSchema{FieldName: "price", FieldType: redis.SearchFieldTypeNumeric, Sortable: true},
&redis.FieldSchema{FieldName: "quantity", FieldType: redis.SearchFieldTypeNumeric, Sortable: true},
&redis.FieldSchema{FieldName: "rating", FieldType: redis.SearchFieldTypeNumeric, Sortable: true},
).Result()
if err != nil {
log.Fatalf("ftcreate: %v", err)
}
}

func seedProducts(ctx context.Context, rdb *redis.Client) {
products := []map[string]interface{}{
{"category": "bike", "brand": "Velorim", "price": 270, "quantity": 12, "rating": 4.5},
{"category": "bike", "brand": "Velorim", "price": 810, "quantity": 3, "rating": 4.1},
{"category": "bike", "brand": "Bicyk", "price": 2300, "quantity": 1, "rating": 4.9},
{"category": "bike", "brand": "Bicyk", "price": 430, "quantity": 7, "rating": 3.8},
{"category": "helmet", "brand": "Nord", "price": 120, "quantity": 25, "rating": 4.7},
{"category": "helmet", "brand": "Nord", "price": 80, "quantity": 40, "rating": 4.0},
{"category": "helmet", "brand": "Peaknetic", "price": 200, "quantity": 10, "rating": 4.8},
{"category": "light", "brand": "Peaknetic", "price": 45, "quantity": 60, "rating": 4.3},
{"category": "light", "brand": "Nord", "price": 30, "quantity": 100, "rating": 3.9},
{"category": "light", "brand": "Velorim", "price": 65, "quantity": 35, "rating": 4.6},
}
for i, p := range products {
key := fmt.Sprintf("product:%d", i)
if err := rdb.HSet(ctx, key, p).Err(); err != nil {
log.Fatalf("hset %s: %v", key, err)
}
}
}

// trimBeforeGroup shows SORTBY ... MAX N applied BEFORE a GROUPBY
// (the pattern that the old pipeline-ordered API could not express).
func trimBeforeGroup(ctx context.Context, rdb *redis.Client) {
opts := &redis.FTAggregateOptions{
Steps: []redis.FTAggregateStep{
// Keep only the top 5 highest-rated products per shard.
{SortBy: &redis.FTAggregateSortByStep{
Fields: []redis.FTAggregateSortBy{{FieldName: "@rating", Desc: true}},
Max: 5,
}},
// Sum their price per category.
{GroupBy: &redis.FTAggregateGroupBy{
Fields: []interface{}{"@category"},
Reduce: []redis.FTAggregateReducer{
{Reducer: redis.SearchSum, Args: []interface{}{"@price"}, As: "price_total"},
{Reducer: redis.SearchCount, As: "n"},
},
}},
// Sort categories by the aggregated total.
{SortBy: &redis.FTAggregateSortByStep{
Fields: []redis.FTAggregateSortBy{{FieldName: "@price_total", Desc: true}},
}},
},
}
printRows(rdb.FTAggregateWithArgs(ctx, indexName, "*", opts).Result())
}

func multiStagePipeline(ctx context.Context, rdb *redis.Client) {
opts := &redis.FTAggregateOptions{
Steps: []redis.FTAggregateStep{
{Load: &redis.FTAggregateLoad{Field: "@price"}},
{Load: &redis.FTAggregateLoad{Field: "@quantity"}},
{Apply: &redis.FTAggregateApply{Field: "@price * @quantity", As: "stock_value"}},
{SortBy: &redis.FTAggregateSortByStep{
Fields: []redis.FTAggregateSortBy{{FieldName: "@stock_value", Desc: true}},
Max: 8,
}},
{GroupBy: &redis.FTAggregateGroupBy{
Fields: []interface{}{"@brand"},
Reduce: []redis.FTAggregateReducer{
{Reducer: redis.SearchSum, Args: []interface{}{"@stock_value"}, As: "brand_value"},
},
}},
{Apply: &redis.FTAggregateApply{Field: "floor(@brand_value)", As: "brand_value"}},
{SortBy: &redis.FTAggregateSortByStep{
Fields: []redis.FTAggregateSortBy{{FieldName: "@brand_value", Desc: true}},
}},
},
}
printRows(rdb.FTAggregateWithArgs(ctx, indexName, "*", opts).Result())
}

func withBuilder(ctx context.Context, rdb *redis.Client) {
res, err := rdb.NewAggregateBuilder(ctx, indexName, "*").
SortBy("@rating", false).SortByMax(5).
GroupBy("@category").
ReduceAs(redis.SearchSum, "price_total", "@price").
ReduceAs(redis.SearchCount, "n").
SortBy("@price_total", false).
Run()
printRows(res, err)
}

func printRows(res *redis.FTAggregateResult, err error) {
if err != nil {
log.Fatalf("aggregate: %v", err)
}
for _, row := range res.Rows {
fmt.Printf(" %v\n", row.Fields)
}
}
Loading
Loading