Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions internal/storage/v2/clickhouse/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@ import (
"go.opentelemetry.io/collector/config/configoptional"
)

const (
defaultProtocol = "native"
defaultDatabase = "jaeger"
defaultSearchDepth = 1000
defaultMaxSearchDepth = 10000
)

type Configuration struct {
// Protocol is the protocol to use to connect to ClickHouse.
// Supported values are "native" and "http". Default is "native".
Expand All @@ -25,6 +32,13 @@ type Configuration struct {
DialTimeout time.Duration `mapstructure:"dial_timeout"`
// CreateSchema, if set to true, will create the ClickHouse schema if it does not exist.
CreateSchema bool `mapstructure:"create_schema"`
// DefaultSearchDepth is the default search depth for queries.
// This is the maximum number of trace IDs that will be returned when searching for traces
// if a limit is not specified in the query.
DefaultSearchDepth int `mapstructure:"default_search_depth"`
// MaxSearchDepth is the maximum allowed search depth for queries.
// This limits the number of trace IDs that can be returned when searching for traces.
MaxSearchDepth int `mapstructure:"max_search_depth"`
// TODO: add more settings
}

Expand All @@ -37,3 +51,18 @@ func (cfg *Configuration) Validate() error {
_, err := govalidator.ValidateStruct(cfg)
return err
}

func (cfg *Configuration) applyDefaults() {
Comment thread
yurishkuro marked this conversation as resolved.
if cfg.Protocol == "" {
cfg.Protocol = "native"
}
if cfg.Database == "" {
cfg.Database = defaultDatabase
}
if cfg.DefaultSearchDepth == 0 {
cfg.DefaultSearchDepth = defaultSearchDepth
}
if cfg.MaxSearchDepth == 0 {
cfg.MaxSearchDepth = defaultMaxSearchDepth
}
}
10 changes: 10 additions & 0 deletions internal/storage/v2/clickhouse/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,13 @@ func TestValidate(t *testing.T) {
})
}
}

func TestConfigurationApplyDefaults(t *testing.T) {
config := &Configuration{}
config.applyDefaults()

require.Equal(t, defaultProtocol, config.Protocol)
require.Equal(t, defaultDatabase, config.Database)
require.Equal(t, defaultSearchDepth, config.DefaultSearchDepth)
require.Equal(t, defaultMaxSearchDepth, config.MaxSearchDepth)
}
6 changes: 5 additions & 1 deletion internal/storage/v2/clickhouse/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type Factory struct {
}

func NewFactory(ctx context.Context, cfg Configuration, telset telemetry.Settings) (*Factory, error) {
cfg.applyDefaults()
f := &Factory{
config: cfg,
telset: telset,
Expand Down Expand Up @@ -88,7 +89,10 @@ func NewFactory(ctx context.Context, cfg Configuration, telset telemetry.Setting
}

func (f *Factory) CreateTraceReader() (tracestore.Reader, error) {
return chtracestore.NewReader(f.conn), nil
return chtracestore.NewReader(f.conn, chtracestore.ReaderConfig{
DefaultSearchDepth: f.config.DefaultSearchDepth,
MaxSearchDepth: f.config.MaxSearchDepth,
}), nil
}

func (f *Factory) CreateTraceWriter() (tracestore.Writer, error) {
Expand Down
7 changes: 7 additions & 0 deletions internal/storage/v2/clickhouse/sql/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,13 @@ WHERE
trace_id = ?
`

// SearchTraceIDs is the base SQL fragment used by FindTraceIDs.
//
// The query begins with a no-op predicate (`WHERE 1=1`) so that additional
// filters can be appended unconditionally using `AND` without needing to check
// whether this is the first WHERE clause.
const SearchTraceIDs = `SELECT DISTINCT trace_id FROM spans WHERE 1=1`

const SelectServices = `
SELECT DISTINCT
name
Expand Down
79 changes: 72 additions & 7 deletions internal/storage/v2/clickhouse/tracestore/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ package tracestore

import (
"context"
"encoding/hex"
"fmt"
"iter"

"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/jaegertracing/jaeger/internal/storage/v2/api/tracestore"
Expand All @@ -18,17 +20,27 @@ import (

var _ tracestore.Reader = (*Reader)(nil)

type ReaderConfig struct {
// DefaultSearchDepth is the default number of trace IDs to return when searching for traces.
// This value is used when the SearchDepth field in TraceQueryParams is not set.
DefaultSearchDepth int
// MaxSearchDepth is the maximum number of trace IDs that can be returned when searching for traces.
// This value is used to limit the SearchDepth field in TraceQueryParams.
MaxSearchDepth int
}

type Reader struct {
conn driver.Conn
conn driver.Conn
config ReaderConfig
}

// NewReader returns a new Reader instance that uses the given ClickHouse connection
// to read trace data.
//
// The provided connection is used exclusively for reading traces, meaning it is safe
// to enable instrumentation on the connection without risk of recursively generating traces.
func NewReader(conn driver.Conn) *Reader {
return &Reader{conn: conn}
func NewReader(conn driver.Conn, cfg ReaderConfig) *Reader {
return &Reader{conn: conn, config: cfg}
}

func (r *Reader) GetTraces(
Expand Down Expand Up @@ -129,9 +141,62 @@ func (*Reader) FindTraces(
panic("not implemented")
}

func (*Reader) FindTraceIDs(
context.Context,
tracestore.TraceQueryParams,
func readRowIntoTraceID(rows driver.Rows) ([]tracestore.FoundTraceID, error) {
var str string

if err := rows.Scan(&str); err != nil {
return nil, fmt.Errorf("failed to scan row: %w", err)
}

b, err := hex.DecodeString(str)
if err != nil {
return nil, fmt.Errorf("failed to decode trace ID: %w", err)
}

return []tracestore.FoundTraceID{
{TraceID: pcommon.TraceID(b)},
}, nil
}

func (r *Reader) FindTraceIDs(
ctx context.Context,
query tracestore.TraceQueryParams,
) iter.Seq2[[]tracestore.FoundTraceID, error] {
panic("not implemented")
return func(yield func([]tracestore.FoundTraceID, error) bool) {
q := sql.SearchTraceIDs
args := []any{}

if query.ServiceName != "" {
q += " AND service_name = ?"
args = append(args, query.ServiceName)
}
if query.OperationName != "" {
q += " AND name = ?"
args = append(args, query.OperationName)
}
q += " LIMIT ?"
if query.SearchDepth > 0 {
Comment thread
mahadzaryab1 marked this conversation as resolved.
if query.SearchDepth > r.config.MaxSearchDepth {
yield(nil, fmt.Errorf("search depth %d exceeds maximum allowed %d", query.SearchDepth, r.config.MaxSearchDepth))
return
}
args = append(args, query.SearchDepth)
} else {
args = append(args, r.config.DefaultSearchDepth)
}

rows, err := r.conn.Query(ctx, q, args...)
if err != nil {
yield(nil, fmt.Errorf("failed to query trace IDs: %w", err))
return
}
defer rows.Close()

for rows.Next() {
traceID, err := readRowIntoTraceID(rows)
if !yield(traceID, err) {
return
}
}
}
}
Loading
Loading