Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 57 additions & 35 deletions internal/storage/v2/clickhouse/tracestore/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,44 +176,16 @@
query tracestore.TraceQueryParams,
) iter.Seq2[[]tracestore.FoundTraceID, error] {
return func(yield func([]tracestore.FoundTraceID, error) bool) {
q := sql.SearchTraceIDs
args := []any{}

if query.ServiceName != "" {
q += " AND s.service_name = ?"
args = append(args, query.ServiceName)
}
if query.OperationName != "" {
q += " AND s.name = ?"
args = append(args, query.OperationName)
}
if query.DurationMin > 0 {
q += " AND s.duration >= ?"
args = append(args, query.DurationMin.Nanoseconds())
limit := query.SearchDepth
if limit == 0 {
limit = r.config.DefaultSearchDepth
}
if query.DurationMax > 0 {
q += " AND s.duration <= ?"
args = append(args, query.DurationMax.Nanoseconds())
}
if !query.StartTimeMin.IsZero() {
q += " AND s.start_time >= ?"
args = append(args, query.StartTimeMin)
}
if !query.StartTimeMax.IsZero() {
q += " AND s.start_time <= ?"
args = append(args, query.StartTimeMax)
if limit > r.config.MaxSearchDepth {
yield(nil, fmt.Errorf("search depth %d exceeds maximum allowed %d", limit, r.config.MaxSearchDepth))
return
}

q += " LIMIT ?"
if query.SearchDepth > 0 {
if query.SearchDepth > r.config.MaxSearchDepth {
yield(nil, fmt.Errorf("search depth %d exceeds maximum allowed %d", query.SearchDepth, r.config.MaxSearchDepth))
return
}
args = append(args, query.SearchDepth)
} else {
args = append(args, r.config.DefaultSearchDepth)
}
q, args := r.buildFindTraceIDsQuery(query, limit)

rows, err := r.conn.Query(ctx, q, args...)
if err != nil {
Expand All @@ -230,3 +202,53 @@
}
}
}

func (r *Reader) buildFindTraceIDsQuery(query tracestore.TraceQueryParams, limit int) (string, []any) {

Check failure on line 206 in internal/storage/v2/clickhouse/tracestore/reader.go

View workflow job for this annotation

GitHub Actions / lint

unused-receiver: method receiver 'r' is not referenced in method's body, consider removing or renaming it as _ (revive)
q := sql.SearchTraceIDs
args := []any{}

if query.ServiceName != "" {
q += " AND s.service_name = ?"
args = append(args, query.ServiceName)
}
if query.OperationName != "" {
q += " AND s.name = ?"
args = append(args, query.OperationName)
}
if query.DurationMin > 0 {
q += " AND s.duration >= ?"
args = append(args, query.DurationMin.Nanoseconds())
}
if query.DurationMax > 0 {
q += " AND s.duration <= ?"
args = append(args, query.DurationMax.Nanoseconds())
}
if !query.StartTimeMin.IsZero() {
q += " AND s.start_time >= ?"
args = append(args, query.StartTimeMin)
}
if !query.StartTimeMax.IsZero() {
q += " AND s.start_time <= ?"
args = append(args, query.StartTimeMax)
}

if query.Attributes.Len() > 0 {
query.Attributes.Range(func(k string, v pcommon.Value) bool {
Comment thread
mahadzaryab1 marked this conversation as resolved.
Outdated
if v.Type() == pcommon.ValueTypeStr {
val := v.Str()
q += " AND ("
q += "arrayExists((key, value) -> key = ? AND value = ?, s.str_attributes.key, s.str_attributes.value)"
q += " OR "
q += "arrayExists((key, value) -> key = ? AND value = ?, s.resource_str_attributes.key, s.resource_str_attributes.value)"
q += ")"
args = append(args, k, val, k, val)
}
return true
})
}

q += " LIMIT ?"
args = append(args, limit)

return q, args
}
30 changes: 25 additions & 5 deletions internal/storage/v2/clickhouse/tracestore/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,20 +550,32 @@ SELECT DISTINCT
t.end
FROM spans s
LEFT JOIN trace_id_timestamps t ON s.trace_id = t.trace_id
WHERE 1=1 AND s.service_name = ? AND s.name = ? AND s.duration >= ? AND s.duration <= ? AND s.start_time >= ? AND s.start_time <= ? LIMIT ?`,
WHERE 1=1` +
` AND s.service_name = ?` +
` AND s.name = ?` +
` AND s.duration >= ?` +
` AND s.duration <= ?` +
` AND s.start_time >= ?` +
` AND s.start_time <= ?` +
` AND (arrayExists((key, value) -> key = ? AND value = ?, s.str_attributes.key, s.str_attributes.value)` +
` OR arrayExists((key, value) -> key = ? AND value = ?, s.resource_str_attributes.key, s.resource_str_attributes.value))` +
` LIMIT ?`,
rows: &testRows[[]any]{
data: testTraceIDsData,
scanFn: scanTraceIDFn(),
},
}
reader := NewReader(driver, testReaderConfig)
attributes := pcommon.NewMap()
attributes.PutStr("http.method", "GET")
iter := reader.FindTraceIDs(context.Background(), tracestore.TraceQueryParams{
ServiceName: "serviceA",
OperationName: "operationA",
DurationMin: 1 * time.Nanosecond,
DurationMax: 1 * time.Second,
StartTimeMin: now.Add(-1 * time.Hour),
StartTimeMax: now,
Attributes: attributes,
SearchDepth: 5,
})
ids, err := jiter.FlattenWithErrors(iter)
Expand Down Expand Up @@ -619,7 +631,9 @@ func TestFindTraceIDs_YieldFalseOnSuccessStopsIteration(t *testing.T) {
}

reader := NewReader(conn, testReaderConfig)
findTraceIDsIter := reader.FindTraceIDs(context.Background(), tracestore.TraceQueryParams{})
findTraceIDsIter := reader.FindTraceIDs(context.Background(), tracestore.TraceQueryParams{
Attributes: pcommon.NewMap(),
})

var gotTraceIDs []tracestore.FoundTraceID
findTraceIDsIter(func(traceIDs []tracestore.FoundTraceID, err error) bool {
Expand Down Expand Up @@ -659,7 +673,9 @@ func TestFindTraceIDs_ScanErrorContinues(t *testing.T) {
}

reader := NewReader(conn, testReaderConfig)
findTraceIDsIter := reader.FindTraceIDs(context.Background(), tracestore.TraceQueryParams{})
findTraceIDsIter := reader.FindTraceIDs(context.Background(), tracestore.TraceQueryParams{
Attributes: pcommon.NewMap(),
})

expected := []tracestore.FoundTraceID{
{
Expand Down Expand Up @@ -700,7 +716,9 @@ func TestFindTraceIDs_DecodeErrorContinues(t *testing.T) {
}

reader := NewReader(conn, ReaderConfig{})
findTraceIDsIter := reader.FindTraceIDs(context.Background(), tracestore.TraceQueryParams{})
findTraceIDsIter := reader.FindTraceIDs(context.Background(), tracestore.TraceQueryParams{
Attributes: pcommon.NewMap(),
})

expectedValidTraceIDs := []tracestore.FoundTraceID{
{
Expand Down Expand Up @@ -779,7 +797,9 @@ func TestFindTraceIDs_ErrorCases(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
reader := NewReader(test.driver, ReaderConfig{})
iter := reader.FindTraceIDs(context.Background(), tracestore.TraceQueryParams{})
iter := reader.FindTraceIDs(context.Background(), tracestore.TraceQueryParams{
Attributes: pcommon.NewMap(),
})
_, err := jiter.FlattenWithErrors(iter)
require.ErrorContains(t, err, test.expectedErr)
})
Expand Down
Loading