Skip to content
Closed
Changes from 2 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
4bc4399
Merge pull request #1 from ClickHouse/clickhouse-exporter-improvements
mauidude Mar 23, 2023
8a07ed4
fix dockerfile
mauidude Mar 23, 2023
864f839
inline asyncs
Mar 24, 2023
cc6a2ca
inline asyncs
Mar 24, 2023
6dd3c69
inline asyncs
Mar 28, 2023
ffe1716
inline asyncs
Mar 29, 2023
69e617b
inline asyncs
Mar 29, 2023
83899a5
Merge pull request #4 from ClickHouse/clickhouse-exporter-improvements
kgoralski Mar 29, 2023
ffd6714
inline asyncs
Mar 29, 2023
b3ed964
inline asyncs
Mar 29, 2023
69b1ebd
inline asyncs
Mar 29, 2023
bbe2a1e
inline asyncs
Mar 29, 2023
f83a8a4
inline asyncs
Mar 29, 2023
d867dd9
inline asyncs
Mar 29, 2023
0d347b7
inline asyncs
Mar 29, 2023
4333f4d
inline asyncs
Mar 29, 2023
c241d5b
Revert "inline asyncs"
Mar 29, 2023
24fb6da
Revert "inline asyncs"
Mar 29, 2023
a26f66e
Revert "inline asyncs"
Mar 29, 2023
b311a31
Revert "inline asyncs"
Mar 29, 2023
0dc214d
Revert "inline asyncs"
Mar 29, 2023
4d6a91b
Revert "inline asyncs"
Mar 29, 2023
5ae2d01
Revert "inline asyncs"
Mar 29, 2023
1dc1e5b
Revert "inline asyncs"
Mar 29, 2023
05ac491
inline asyncs
Mar 29, 2023
afec775
inline asyncs
Mar 29, 2023
08c2777
inline asyncs
Mar 29, 2023
1828a67
inline asyncs
Mar 29, 2023
255dd5f
inline asyncs
Mar 29, 2023
5c17459
inline asyncs
Mar 29, 2023
aa55f73
inline asyncs
Mar 29, 2023
41539e3
inline asyncs
Mar 29, 2023
9f7e209
inline asyncs
Mar 29, 2023
988140a
inline asyncs
Mar 29, 2023
8dd5f69
inline asyncs
Mar 29, 2023
cc99bc4
inline asyncs
Mar 29, 2023
aed37eb
inline asyncs
Mar 29, 2023
34dc41b
inline asyncs
Mar 29, 2023
8089797
Revert "inline asyncs"
Mar 29, 2023
ce0cd73
inline asyncs
Mar 29, 2023
a9b5ad2
inline asyncs
Mar 29, 2023
06689a6
inline asyncs
Mar 29, 2023
c36bbed
inline asyncs
Mar 30, 2023
d088354
inline asyncs
Mar 30, 2023
8f886b3
inline asyncs
Mar 30, 2023
8bbaf98
inline asyncs
Mar 30, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 94 additions & 13 deletions exporter/clickhouseexporter/exporter_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ import (
"context"
"database/sql"
"fmt"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"log"
"net/url"
"strings"
"time"

"github.com/ClickHouse/clickhouse-go/v2"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
Expand All @@ -33,24 +35,26 @@ import (
)

type logsExporter struct {
client *sql.DB
insertSQL string
client *sql.DB
nativeClient clickhouse.Conn
insertSQL string

logger *zap.Logger
cfg *Config
}

func newLogsExporter(logger *zap.Logger, cfg *Config) (*logsExporter, error) {
client, err := newClickHouseConn(cfg)
client, nativeClient, err := newClickHouseConn(cfg)
if err != nil {
return nil, err
}

return &logsExporter{
client: client,
insertSQL: renderInsertLogsSQL(cfg),
logger: logger,
cfg: cfg,
client: client,
nativeClient: nativeClient,
insertSQL: renderInsertLogsSQL(cfg),
logger: logger,
cfg: cfg,
}, nil
}

Expand All @@ -73,14 +77,83 @@ func (e *logsExporter) shutdown(_ context.Context) error {
return nil
}

func (e *logsExporter) pushNativeLogsData(ctx context.Context, ld plog.Logs) error {
start := time.Now()

err := func() error {

batch, err := e.nativeClient.PrepareBatch(ctx, e.insertSQL)
if err != nil {
return fmt.Errorf("Prepare:%w", err)
}

var serviceName string
resAttr := make(map[string]string)

resourceLogs := ld.ResourceLogs()
for i := 0; i < resourceLogs.Len(); i++ {
logs := resourceLogs.At(i)
res := logs.Resource()

attrs := res.Attributes()
attributesToMap(attrs, resAttr)

if v, ok := attrs.Get(conventions.AttributeServiceName); ok {
serviceName = v.Str()
}
for j := 0; j < logs.ScopeLogs().Len(); j++ {
rs := logs.ScopeLogs().At(j).LogRecords()
for k := 0; k < rs.Len(); k++ {
r := rs.At(k)

logAttr := make(map[string]string, attrs.Len())
attributesToMap(r.Attributes(), logAttr)

err = batch.Append(
r.Timestamp().AsTime(),
traceutil.TraceIDToHexOrEmptyString(r.TraceID()),
traceutil.SpanIDToHexOrEmptyString(r.SpanID()),
uint32(r.Flags()),
r.SeverityText(),
int32(r.SeverityNumber()),
serviceName,
r.Body().AsString(),
resAttr,
logAttr,
)
if err != nil {
return fmt.Errorf("Append:%w", err)
}
}
}

// clear map for reuse
for k := range resAttr {
delete(resAttr, k)
}
}

if err := batch.Send(); err != nil {
_ = batch.Abort()
return fmt.Errorf("Send:%w", err)
}

return nil
}()

duration := time.Since(start)
e.logger.Info("insert logs", zap.Int("records", ld.LogRecordCount()),
zap.String("cost", duration.String()))
return err
}

func (e *logsExporter) pushLogsData(ctx context.Context, ld plog.Logs) error {
start := time.Now()
err := func() error {
scope, err := e.client.Begin()
if err != nil {
return fmt.Errorf("Begin:%w", err)
}

batch, err := scope.Prepare(e.insertSQL)
if err != nil {
return fmt.Errorf("Prepare:%w", err)
Expand Down Expand Up @@ -176,7 +249,8 @@ SETTINGS index_granularity=8192, ttl_only_drop_parts = 1;
`

// language=ClickHouse SQL
insertLogsSQLTemplate = `INSERT INTO %s (
// SETTINGS async_insert=1, wait_for_async_insert=0
insertLogsSQLTemplate = `INSERT INTO %s SETTINGS (
Timestamp,
TraceId,
SpanId,
Expand All @@ -203,7 +277,7 @@ func newClickHouseClient(cfg *Config) (*sql.DB, error) {
}

// used by logs:
func newClickHouseConn(cfg *Config) (*sql.DB, error) {
func newClickHouseConn(cfg *Config) (*sql.DB, driver.Conn, error) {
endpoint := cfg.Endpoint

if len(cfg.ConnectionParams) > 0 {
Expand All @@ -223,8 +297,11 @@ func newClickHouseConn(cfg *Config) (*sql.DB, error) {

opts, err := clickhouse.ParseDSN(endpoint)
if err != nil {
return nil, fmt.Errorf("unable to parse endpoint: %w", err)
return nil, nil, fmt.Errorf("unable to parse endpoint: %w", err)
}
// TODO config
opts.Settings["async_insert"] = 1
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if these appear in the query string they get applied as settings

opts.Settings["wait_for_async_insert"] = 0

opts.Auth = clickhouse.Auth{
Database: cfg.Database,
Expand All @@ -234,7 +311,11 @@ func newClickHouseConn(cfg *Config) (*sql.DB, error) {

// can return a "bad" connection if misconfigured, we won't know
// until a Ping, Exec, etc.. is done
return clickhouse.OpenDB(opts), nil
conn, err := clickhouse.Open(opts)
if err != nil {
log.Fatal(err)
}
return clickhouse.OpenDB(opts), conn, nil
}

func createDatabase(ctx context.Context, cfg *Config) error {
Expand Down